crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-455: Support custom rawcomparators for sorting in the in-memory mode
Date Mon, 11 Aug 2014 21:57:56 GMT
Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 21922e756 -> dd4a75675


CRUNCH-455: Support custom rawcomparators for sorting in the in-memory mode


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/dd4a7567
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/dd4a7567
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/dd4a7567

Branch: refs/heads/apache-crunch-0.8
Commit: dd4a75675b6d21f427e3f04f3a5371658b2a01c7
Parents: 21922e7
Author: Josh Wills <jwills@apache.org>
Authored: Mon Aug 11 11:06:10 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Aug 11 14:57:36 2014 -0700

----------------------------------------------------------------------
 .../impl/mem/collect/MemGroupedTable.java       | 13 +++-
 .../crunch/impl/mem/collect/Shuffler.java       | 64 +++++++++++++++-----
 .../crunch/types/avro/AvroKeyConverter.java     | 14 +----
 .../java/org/apache/crunch/lib/SortTest.java    | 41 +++++++++++++
 4 files changed, 104 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/dd4a7567/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 172fe36..6efc062 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -24,6 +24,8 @@ import java.util.TreeMap;
 
 import org.apache.crunch.Aggregator;
 import org.apache.crunch.CombineFn;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
@@ -137,6 +139,15 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K,
Iterable<V>>> implemen
   
   @Override
   public PTable<K, V> ungroup() {
-    return parent;
+    return parallelDo("ungroup", new UngroupFn<K, V>(), parent.getPTableType());
+  }
+
+  private static class UngroupFn<K, V> extends DoFn<Pair<K, Iterable<V>>,
Pair<K, V>> {
+    @Override
+    public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>>
emitter) {
+      for (V v : input.second()) {
+        emitter.emit(Pair.of(input.first(), v));
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/dd4a7567/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
index 2e8f9eb..489fab3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/Shuffler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.crunch.impl.mem.collect;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -26,12 +27,14 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.SingleUseIterable;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Function;
@@ -57,49 +60,80 @@ abstract class Shuffler<K, V> implements Iterable<Pair<K,
Iterable<V>>> {
   
   public static <S, T> Shuffler<S, T> create(PType<S> keyType, GroupingOptions
options,
       Pipeline pipeline) {
-    Map<S, Collection<T>> map = getMapForKeyType(keyType);
+    Map<Object, Collection<T>> map = getMapForKeyType(keyType);
     
     if (options != null) {
+      Job job;
+      try {
+        job = new Job(pipeline.getConfiguration());
+      } catch (IOException e) {
+        throw new IllegalStateException("Could not create Job instance", e);
+      }
+      options.configure(job);
       if (Pair.class.equals(keyType.getTypeClass()) && options.getGroupingComparatorClass()
!= null) {
         PType<?> pairKey = keyType.getSubTypes().get(0);
         return new SecondarySortShuffler(getMapForKeyType(pairKey));
       } else if (options.getSortComparatorClass() != null) {
-        RawComparator<S> rc = ReflectionUtils.newInstance(options.getSortComparatorClass(),
-            pipeline.getConfiguration());
-        map = new TreeMap<S, Collection<T>>(rc);
+        RawComparator rc = ReflectionUtils.newInstance(
+            options.getSortComparatorClass(),
+            job.getConfiguration());
+        map = new TreeMap<Object, Collection<T>>(rc);
+        return new MapShuffler<S, T>(map, keyType);
       }
     }
-    
     return new MapShuffler<S, T>(map);
   }
   
-  private static class HFunction<K, V> implements Function<Map.Entry<K, Collection<V>>,
Pair<K, Iterable<V>>> {
+  private static class HFunction<K, V> implements Function<Map.Entry<Object,
Collection<V>>, Pair<K, Iterable<V>>> {
+    private final PType<K> keyType;
+
+    public HFunction(PType<K> keyType) {
+      this.keyType = keyType;
+    }
+
     @Override
-    public Pair<K, Iterable<V>> apply(Map.Entry<K, Collection<V>>
input) {
-      return Pair.<K, Iterable<V>>of(input.getKey(), new SingleUseIterable<V>(input.getValue()));
+    public Pair<K, Iterable<V>> apply(Map.Entry<Object, Collection<V>>
input) {
+      K key;
+      if (keyType == null) {
+        key = (K) input.getKey();
+      } else {
+        Object k = keyType.getConverter().convertInput(input.getKey(), null);
+        key = keyType.getInputMapFn().map(k);
+      }
+      return Pair.<K, Iterable<V>>of(key, new SingleUseIterable<V>(input.getValue()));
     }
   }
   
   private static class MapShuffler<K, V> extends Shuffler<K, V> {
-    private final Map<K, Collection<V>> map;
-    
-    public MapShuffler(Map<K, Collection<V>> map) {
+    private final Map<Object, Collection<V>> map;
+    private final PType<K> keyType;
+
+    public MapShuffler(Map<Object, Collection<V>> map) {
+      this(map, null);
+    }
+
+    public MapShuffler(Map<Object, Collection<V>> map, PType<K> keyType)
{
       this.map = map;
+      this.keyType = keyType;
     }
     
     @Override
     public Iterator<Pair<K, Iterable<V>>> iterator() {
       return Iterators.transform(map.entrySet().iterator(),
-          new HFunction<K, V>());
+          new HFunction<K, V>(keyType));
     }
 
     @Override
     public void add(Pair<K, V> record) {
-      if (!map.containsKey(record.first())) {
+      Object key = record.first();
+      if (keyType != null) {
+        key = keyType.getConverter().outputKey(keyType.getOutputMapFn().map((K) key));
+      }
+      if (!map.containsKey(key)) {
         Collection<V> values = Lists.newArrayList();
-        map.put(record.first(), values);
+        map.put(key, values);
       }
-      map.get(record.first()).add(record.second());
+      map.get(key).add(record.second());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/dd4a7567/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
index d59e9a9..15ed8bc 100644
--- a/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
+++ b/crunch-core/src/main/java/org/apache/crunch/types/avro/AvroKeyConverter.java
@@ -24,8 +24,6 @@ import org.apache.hadoop.io.NullWritable;
 
 class AvroKeyConverter<K> implements Converter<AvroWrapper<K>, NullWritable,
K, Iterable<K>> {
 
-  private transient AvroWrapper<K> wrapper = null;
-
   @Override
   public K convertInput(AvroWrapper<K> key, NullWritable value) {
     return key.datum();
@@ -33,8 +31,7 @@ class AvroKeyConverter<K> implements Converter<AvroWrapper<K>,
NullWritable, K,
 
   @Override
   public AvroWrapper<K> outputKey(K value) {
-    getWrapper().datum(value);
-    return wrapper;
+    return new AvroKey<K>(value);
   }
 
   @Override
@@ -44,7 +41,7 @@ class AvroKeyConverter<K> implements Converter<AvroWrapper<K>,
NullWritable, K,
 
   @Override
   public Class<AvroWrapper<K>> getKeyClass() {
-    return (Class<AvroWrapper<K>>) getWrapper().getClass();
+    return (Class<AvroWrapper<K>>) (Class) AvroKey.class;
   }
 
   @Override
@@ -52,13 +49,6 @@ class AvroKeyConverter<K> implements Converter<AvroWrapper<K>,
NullWritable, K,
     return NullWritable.class;
   }
 
-  private AvroWrapper<K> getWrapper() {
-    if (wrapper == null) {
-      wrapper = new AvroKey<K>();
-    }
-    return wrapper;
-  }
-
   @Override
   public Iterable<K> convertIterableInput(AvroWrapper<K> key, Iterable<NullWritable>
value) {
     throw new UnsupportedOperationException("Should not be possible");

http://git-wip-us.apache.org/repos/asf/crunch/blob/dd4a7567/crunch-core/src/test/java/org/apache/crunch/lib/SortTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/SortTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/SortTest.java
new file mode 100644
index 0000000..d1f1fa0
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/SortTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import static org.apache.crunch.types.avro.Avros.longs;
+import static org.apache.crunch.types.avro.Avros.pairs;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.junit.Assert.assertEquals;
+
+public class SortTest {
+
+  @Test
+  public void testInMemoryReverseAvro() throws Exception {
+    PCollection<Pair<String, Long>> pc = MemPipeline.typedCollectionOf(pairs(strings(),
longs()),
+        Pair.of("a", 1L), Pair.of("c", 7L), Pair.of("b", 10L));
+    PCollection<Pair<String, Long>> sorted = Sort.sortPairs(pc, Sort.ColumnOrder.by(2,
Sort.Order.DESCENDING));
+    assertEquals(ImmutableList.of(Pair.of("b", 10L), Pair.of("c", 7L), Pair.of("a", 1L)),
+        ImmutableList.copyOf(sorted.materialize()));
+  }
+}


Mime
View raw message