crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gr...@apache.org
Subject crunch git commit: CRUNCH-527 Use hash smearing for partitioning
Date Tue, 26 May 2015 18:47:09 GMT
Repository: crunch
Updated Branches:
  refs/heads/master cddba97c5 -> 06688d55e


CRUNCH-527 Use hash smearing for partitioning

Apply a supplemental "smearing" function to hash codes when
partitioning data. The UniformHashPartitioner (which applies this
technique) is now the default partitioner.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/06688d55
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/06688d55
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/06688d55

Branch: refs/heads/master
Commit: 06688d55ee306a52d243b10de017ebeec913577a
Parents: cddba97
Author: Gabriel Reid <greid@apache.org>
Authored: Tue May 26 09:42:44 2015 +0200
Committer: Gabriel Reid <greid@apache.org>
Committed: Tue May 26 09:42:44 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/crunch/GroupingOptions.java |  3 +-
 .../impl/mr/run/UniformHashPartitioner.java     | 34 ++++++++++++++
 .../org/apache/crunch/lib/join/JoinUtils.java   |  8 ++--
 .../java/org/apache/crunch/util/HashUtil.java   | 42 +++++++++++++++++
 .../impl/mr/run/UniformHashPartitionerTest.java | 47 ++++++++++++++++++++
 .../lib/AvroIndexedRecordPartitionerTest.java   |  4 +-
 .../lib/TupleWritablePartitionerTest.java       |  2 +-
 7 files changed, 131 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java b/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
index 59abe27..1616a46 100644
--- a/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
+++ b/crunch-core/src/main/java/org/apache/crunch/GroupingOptions.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.crunch.impl.mr.run.UniformHashPartitioner;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Partitioner;
@@ -127,7 +128,7 @@ public class GroupingOptions implements Serializable {
    * 
    */
   public static class Builder {
-    private Class<? extends Partitioner> partitionerClass;
+    private Class<? extends Partitioner> partitionerClass = UniformHashPartitioner.class;
     private Class<? extends RawComparator> groupingComparatorClass;
     private Class<? extends RawComparator> sortComparatorClass;
     private boolean requireSortedKeys;

http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/UniformHashPartitioner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/UniformHashPartitioner.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/UniformHashPartitioner.java
new file mode 100644
index 0000000..c5b8459
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/UniformHashPartitioner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.impl.mr.run;
+
+import org.apache.crunch.util.HashUtil;
+import org.apache.hadoop.mapreduce.Partitioner;
+
+/**
+ * Hash partitioner which applies a supplemental hashing function to the hash code of values
to ensure better
+ * distribution of keys over partitions.
+ */
+public class UniformHashPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>
{
+
+  @Override
+  public int getPartition(KEY key, VALUE value, int numPartitions) {
+    return ((HashUtil.smearHash(key.hashCode()) & Integer.MAX_VALUE) % numPartitions);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
index 02963a7..ba532f2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/JoinUtils.java
@@ -21,14 +21,12 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.mapred.AvroJob;
-import org.apache.avro.mapred.AvroKey;
-import org.apache.avro.mapred.AvroValue;
 import org.apache.avro.mapred.AvroWrapper;
-import org.apache.avro.reflect.ReflectData;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroMode;
 import org.apache.crunch.types.writable.TupleWritable;
 import org.apache.crunch.types.writable.WritableTypeFamily;
+import org.apache.crunch.util.HashUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.DataInputBuffer;
@@ -61,7 +59,7 @@ public class JoinUtils {
   public static class TupleWritablePartitioner extends Partitioner<TupleWritable, Writable>
{
     @Override
     public int getPartition(TupleWritable key, Writable value, int numPartitions) {
-      return (key.get(0).hashCode() & Integer.MAX_VALUE) % numPartitions;
+      return (HashUtil.smearHash(key.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
     }
   }
 
@@ -103,7 +101,7 @@ public class JoinUtils {
       } else {
         throw new UnsupportedOperationException("Unknown avro key type: " + key);
       }
-      return (record.get(0).hashCode() & Integer.MAX_VALUE) % numPartitions;
+      return (HashUtil.smearHash(record.get(0).hashCode()) & Integer.MAX_VALUE) % numPartitions;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/main/java/org/apache/crunch/util/HashUtil.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/util/HashUtil.java b/crunch-core/src/main/java/org/apache/crunch/util/HashUtil.java
new file mode 100644
index 0000000..aed1348
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/util/HashUtil.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.util;
+
+/**
+ * Utility methods for working with hash codes.
+ */
+public class HashUtil {
+
+  /**
+   * Applies a supplemental hashing function to an integer, increasing variability in lower-order
bits.
+   * This method is intended to avoid collisions in functions which rely on variance in the
lower bits of a hash
+   * code (e.g. hash partitioning).
+   */
+  // The following comments and code are taken directly from Guava's com.google.common.collect.Hashing
class
+  // This method was written by Doug Lea with assistance from members of JCP
+  // JSR-166 Expert Group and released to the public domain, as explained at
+  // http://creativecommons.org/licenses/publicdomain
+  //
+  // As of 2010/06/11, this method is identical to the (package private) hash
+  // method in OpenJDK 7's java.util.HashMap class.
+  public static int smearHash(int hashCode) {
+    hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
+    return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/test/java/org/apache/crunch/impl/mr/run/UniformHashPartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/run/UniformHashPartitionerTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/mr/run/UniformHashPartitionerTest.java
new file mode 100644
index 0000000..1518c1b
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/run/UniformHashPartitionerTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.impl.mr.run;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.*;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+import org.junit.Test;
+
+public class UniformHashPartitionerTest {
+
+  private static final UniformHashPartitioner INSTANCE = new UniformHashPartitioner();
+
+  // Simple test to ensure that the general idea behind this partitioner is working.
+  // We create 100 keys that have exactly the same lower-order bits, and partition them into
10 buckets,
+  // and then verify that every bucket got at least 20% of the keys. The default HashPartitioner
would put
+  // everything in the same bucket.
+  @Test
+  public void testGetPartition() {
+    Multiset<Integer> partitionCounts = HashMultiset.create();
+    final int NUM_PARTITIONS = 10;
+    for (int i = 0; i < 1000; i += 10) {
+      partitionCounts.add(INSTANCE.getPartition(i, i, NUM_PARTITIONS));
+    }
+    for (int partitionNumber = 0; partitionNumber < NUM_PARTITIONS; partitionNumber++)
{
+      assertThat(partitionCounts.count(partitionNumber), greaterThan(5));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
b/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
index ac8b89c..36bbd12 100644
--- a/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/AvroIndexedRecordPartitionerTest.java
@@ -50,7 +50,7 @@ public class AvroIndexedRecordPartitionerTest {
     IndexedRecord indexedRecord = new MockIndexedRecord(-3);
     AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
 
-    assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(),
5));
+    assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(),
5));
     assertEquals(1, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(),
2));
   }
 
@@ -59,7 +59,7 @@ public class AvroIndexedRecordPartitionerTest {
     IndexedRecord indexedRecord = new MockIndexedRecord(Integer.MIN_VALUE);
     AvroKey<IndexedRecord> avroKey = new AvroKey<IndexedRecord>(indexedRecord);
 
-    assertEquals(0, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(),
Integer.MAX_VALUE));
+    assertEquals(151558288, avroPartitioner.getPartition(avroKey, new AvroValue<Object>(),
Integer.MAX_VALUE));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/06688d55/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
index 86c7437..6a3c0a8 100644
--- a/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/TupleWritablePartitionerTest.java
@@ -43,7 +43,7 @@ public class TupleWritablePartitionerTest {
     IntWritable intWritable = new IntWritable(3);
     BytesWritable bw = new BytesWritable(WritableUtils.toByteArray(intWritable));
     TupleWritable key = new TupleWritable(new Writable[] { bw });
-    assertEquals(2, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
+    assertEquals(4, tupleWritableParitioner.getPartition(key, NullWritable.get(), 5));
     assertEquals(0, tupleWritableParitioner.getPartition(key, NullWritable.get(), 2));
   }
 }


Mime
View raw message