beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: Move HashingFn to io/common, switch to better hash
Date Tue, 11 Apr 2017 15:16:01 GMT
Repository: beam
Updated Branches:
  refs/heads/master 84a96297c -> c46b256d7


Move HashingFn to io/common, switch to better hash


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b615013b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b615013b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b615013b

Branch: refs/heads/master
Commit: b615013b9c941038d3e9fd96a153f0894f52f183
Parents: 84a9629
Author: Stephen Sisk <sisk@google.com>
Authored: Fri Apr 7 12:59:28 2017 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Apr 11 08:15:49 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/common/pom.xml                     |   4 +
 .../apache/beam/sdk/io/common/HashingFn.java    | 109 +++++++++++++++++++
 sdks/java/io/hadoop/jdk1.8-tests/pom.xml        |  46 +-------
 .../inputformat/HIFIOWithElasticTest.java       |   6 +-
 .../hadoop/inputformat/hashing/HashingFn.java   | 109 -------------------
 .../integration/tests/HIFIOCassandraIT.java     |   2 +-
 .../integration/tests/HIFIOElasticIT.java       |   2 +-
 7 files changed, 124 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/common/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/pom.xml b/sdks/java/io/common/pom.xml
index fa51b47..3f6d79d 100644
--- a/sdks/java/io/common/pom.xml
+++ b/sdks/java/io/common/pom.xml
@@ -34,5 +34,9 @@
         <groupId>org.apache.beam</groupId>
         <artifactId>beam-sdks-java-core</artifactId>
       </dependency>
+      <dependency>
+        <groupId>com.google.guava</groupId>
+        <artifactId>guava</artifactId>
+      </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
new file mode 100644
index 0000000..d534c87
--- /dev/null
+++ b/sdks/java/io/common/src/test/java/org/apache/beam/sdk/io/common/HashingFn.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
+ * or implied. See the License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.beam.sdk.io.common;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+
+/**
+ * Custom Function for Hashing. The combiner is combineUnordered, and accumulator is a
+ * HashCode.
+ */
+public class HashingFn extends CombineFn<String, HashingFn.Accum, String> {
+
+  /**
+   * Serializable Class to store the HashCode of input String.
+   */
+  public static class Accum implements Serializable {
+    HashCode hashCode = null;
+
+    public Accum(HashCode value) {
+      this.hashCode = value;
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
+      in.defaultReadObject();
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+      out.defaultWriteObject();
+    }
+  }
+
+  @Override
+  public Accum addInput(Accum accum, String input) {
+    List<HashCode> elementHashes = Lists.newArrayList();
+     if (accum.hashCode != null) {
+      elementHashes.add(accum.hashCode);
+    }
+    HashCode inputHashCode = Hashing.murmur3_128().hashString(input, StandardCharsets.UTF_8);
+    elementHashes.add(inputHashCode);
+    accum.hashCode = Hashing.combineUnordered(elementHashes);
+    return accum;
+  }
+
+  @Override
+  public Accum mergeAccumulators(Iterable<Accum> accums) {
+    Accum merged = createAccumulator();
+    List<HashCode> elementHashes = Lists.newArrayList();
+    for (Accum accum : accums) {
+      if (accum.hashCode != null) {
+        elementHashes.add(accum.hashCode);
+      }
+    }
+    merged.hashCode = Hashing.combineUnordered(elementHashes);
+    return merged;
+  }
+
+  @Override
+  public String extractOutput(Accum accum) {
+    // Return the combined hash code of list of elements in the Pcollection.
+    String consolidatedHash = "";
+    if (accum.hashCode != null) {
+      consolidatedHash = accum.hashCode.toString();
+    }
+    return consolidatedHash;
+  }
+
+  @Override
+  public Coder<Accum> getAccumulatorCoder(CoderRegistry registry, Coder<String>
inputCoder)
+      throws CannotProvideCoderException {
+    return SerializableCoder.of(Accum.class);
+  }
+
+  @Override
+  public Coder<String> getDefaultOutputCoder(CoderRegistry registry, Coder<String>
inputCoder) {
+    return inputCoder;
+  }
+
+  @Override
+  public Accum createAccumulator() {
+    return new Accum(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
index 4c510ae..84b923a 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
+++ b/sdks/java/io/hadoop/jdk1.8-tests/pom.xml
@@ -36,41 +36,6 @@
 
   <build>
     <plugins>
-      <plugin>
-       <!-- Guava shading is required as Cassandra tests require version
-       19 of Guava, by default project wide Guava shading may not suffice as it
-       loads a different version of guava which will not work for Cassandra tests -->
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-shade-plugin</artifactId>
-        <executions>
-          <execution>
-            <phase>package</phase>
-            <goals>
-              <goal>shade</goal>
-            </goals>
-            <configuration>
-              <artifactSet>
-                <includes>
-                  <include>com.google.guava:guava:19.0</include>
-                </includes>
-              </artifactSet>
-              <relocations>
-                <relocation>
-                  <pattern>com.google.common</pattern>
-                  <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.common</shadedPattern>
-                </relocation>
-               <relocation>
-                 <pattern>com.google.thirdparty</pattern>
-                 <shadedPattern>org.apache.beam.sdk.io.hadoop.jdk1.8-tests.repackaged.com.google.thirdparty</shadedPattern>
-                 </relocation>
-               </relocations>
-               <transformers>
-                 <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
-               </transformers>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
       <!-- Overridden enforcer plugin for JDK1.8 for running tests -->
       <plugin>
         <groupId>org.apache.maven.plugins</groupId>
@@ -178,11 +143,6 @@
       <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
     </dependency>
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>${guava.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
@@ -208,6 +168,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-io-common</artifactId>
+      <scope>test</scope>
+      <classifier>tests</classifier>
+    </dependency>
+    <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-core</artifactId>
       <version>2.6.2</version>

http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
index 599a4a1..51cbd5a 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/HIFIOWithElasticTest.java
@@ -25,7 +25,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn;
+import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
@@ -105,7 +105,7 @@ public class HIFIOWithElasticTest implements Serializable {
   @Test
   public void testHifIOWithElastic() {
     // Expected hashcode is evaluated during insertion time one time and hardcoded here.
-    String expectedHashCode = "e2098f431f90193aa4545e033e6fd2217aafe7b6";
+    String expectedHashCode = "a62a85f5f081e3840baf1028d4d6c6bc";
     Configuration conf = getConfiguration();
     PCollection<KV<Text, LinkedMapWritable>> esData =
         pipeline.apply(HadoopInputFormatIO.<Text, LinkedMapWritable>read().withConfiguration(conf));
@@ -135,7 +135,7 @@ public class HIFIOWithElasticTest implements Serializable {
   @Test
   public void testHifIOWithElasticQuery() {
     long expectedRowCount = 1L;
-    String expectedHashCode = "caa37dbd8258e3a7f98932958c819a57aab044ec";
+    String expectedHashCode = "cfbf3e5c993d44e57535a114e25f782d";
     Configuration conf = getConfiguration();
     String fieldValue = ELASTIC_TYPE_ID_PREFIX + "2";
     String query = "{"

http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
deleted file mode 100644
index fe37048..0000000
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/hashing/HashingFn.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain
a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express
- * or implied. See the License for the specific language governing permissions and limitations
under
- * the License.
- */
-package org.apache.beam.sdk.io.hadoop.inputformat.hashing;
-
-import com.google.common.collect.Lists;
-import com.google.common.hash.HashCode;
-import com.google.common.hash.Hashing;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-
-/**
- * Custom Function for Hashing. The combiner is combineUnordered, and accumulator is a
- * HashCode.
- */
-public class HashingFn extends CombineFn<String, HashingFn.Accum, String> {
-
-  /**
-   * Serializable Class to store the HashCode of input String.
-   */
-  public static class Accum implements Serializable {
-    HashCode hashCode = null;
-
-    public Accum(HashCode value) {
-      this.hashCode = value;
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
-      in.defaultReadObject();
-    }
-
-    private void writeObject(ObjectOutputStream out) throws IOException {
-      out.defaultWriteObject();
-    }
-  }
-
-  @Override
-  public Accum addInput(Accum accum, String input) {
-    List<HashCode> elementHashes = Lists.newArrayList();
-     if (accum.hashCode != null) {
-      elementHashes.add(accum.hashCode);
-    }
-    HashCode inputHashCode = Hashing.sha1().hashString(input, StandardCharsets.UTF_8);
-    elementHashes.add(inputHashCode);
-    accum.hashCode = Hashing.combineUnordered(elementHashes);
-    return accum;
-  }
-
-  @Override
-  public Accum mergeAccumulators(Iterable<Accum> accums) {
-    Accum merged = createAccumulator();
-    List<HashCode> elementHashes = Lists.newArrayList();
-    for (Accum accum : accums) {
-      if (accum.hashCode != null) {
-        elementHashes.add(accum.hashCode);
-      }
-    }
-    merged.hashCode = Hashing.combineUnordered(elementHashes);
-    return merged;
-  }
-
-  @Override
-  public String extractOutput(Accum accum) {
-    // Return the combined hash code of list of elements in the Pcollection.
-    String consolidatedHash = "";
-    if (accum.hashCode != null) {
-      consolidatedHash = accum.hashCode.toString();
-    }
-    return consolidatedHash;
-  }
-
-  @Override
-  public Coder<Accum> getAccumulatorCoder(CoderRegistry registry, Coder<String>
inputCoder)
-      throws CannotProvideCoderException {
-    return SerializableCoder.of(Accum.class);
-  }
-
-  @Override
-  public Coder<String> getDefaultOutputCoder(CoderRegistry registry, Coder<String>
inputCoder) {
-    return inputCoder;
-  }
-
-  @Override
-  public Accum createAccumulator() {
-    return new Accum(null);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java
index bf9a5fd..bf4cb92 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOCassandraIT.java
@@ -21,9 +21,9 @@ import com.datastax.driver.core.Row;
 
 import java.io.Serializable;
 
+import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO;
 import org.apache.beam.sdk.io.hadoop.inputformat.custom.options.HIFTestOptions;
-import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;

http://git-wip-us.apache.org/repos/asf/beam/blob/b615013b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java
b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java
index 13c0cbc..65ef8f2 100644
--- a/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java
+++ b/sdks/java/io/hadoop/jdk1.8-tests/src/test/java/org/apache/beam/sdk/io/hadoop/inputformat/integration/tests/HIFIOElasticIT.java
@@ -17,9 +17,9 @@ package org.apache.beam.sdk.io.hadoop.inputformat.integration.tests;
 import java.io.IOException;
 import java.io.Serializable;
 
+import org.apache.beam.sdk.io.common.HashingFn;
 import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO;
 import org.apache.beam.sdk.io.hadoop.inputformat.custom.options.HIFTestOptions;
-import org.apache.beam.sdk.io.hadoop.inputformat.hashing.HashingFn;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;


Mime
View raw message