crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-501: Detach PType values before calling Aggregator methods
Date Tue, 24 Feb 2015 17:56:58 GMT
Repository: crunch
Updated Branches:
  refs/heads/master cd4b3887b -> d7443e392


CRUNCH-501: Detach PType values before calling Aggregator methods


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d7443e39
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d7443e39
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d7443e39

Branch: refs/heads/master
Commit: d7443e3927aaec99563d8a48fe1341c5d5b8f85a
Parents: cd4b388
Author: Josh Wills <jwills@apache.org>
Authored: Mon Feb 23 19:35:53 2015 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Feb 23 19:35:53 2015 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/io/avro/AvroModeIT.java   |  6 ++---
 .../java/org/apache/crunch/fn/Aggregators.java  | 28 +++++++++++++++++---
 .../impl/dist/collect/BaseGroupedTable.java     |  5 ++--
 .../impl/mem/collect/MemGroupedTable.java       |  5 ++--
 4 files changed, 33 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/d7443e39/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
index ff66fd7..8fa55eb 100644
--- a/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/io/avro/AvroModeIT.java
@@ -57,12 +57,12 @@ public class AvroModeIT implements Serializable {
       "  ]\n" +
       "}");
 
-  static final class FloatArray {
+  public static final class FloatArray {
     private final float[] values;
-    FloatArray() {
+    public FloatArray() {
       this(null);
     }
-    FloatArray(float[] values) {
+    public FloatArray(float[] values) {
       this.values = values;
     }
     float[] getValues() {

http://git-wip-us.apache.org/repos/asf/crunch/blob/d7443e39/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
index 084cca4..e7aeb18 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -32,6 +32,7 @@ import org.apache.crunch.Tuple;
 import org.apache.crunch.Tuple3;
 import org.apache.crunch.Tuple4;
 import org.apache.crunch.TupleN;
+import org.apache.crunch.types.PType;
 import org.apache.crunch.util.Tuples;
 import org.apache.hadoop.conf.Configuration;
 
@@ -439,9 +440,23 @@ public final class Aggregators {
    *
    * @param aggregator The instance to wrap
    * @return A {@link CombineFn} delegating to {@code aggregator}
+   *
+   * @deprecated use the safer {@link #toCombineFn(Aggregator, PType)} instead.
    */
+  @Deprecated
   public static final <K, V> CombineFn<K, V> toCombineFn(Aggregator<V>
aggregator) {
-    return new AggregatorCombineFn<K, V>(aggregator);
+    return toCombineFn(aggregator, null);
+  }
+
+  /**
+   * Wrap a {@link CombineFn} adapter around the given aggregator.
+   *
+   * @param aggregator The instance to wrap
+   * @param ptype The PType of the aggregated value (for detaching complex objects)
+   * @return A {@link CombineFn} delegating to {@code aggregator}
+   */
+  public static final <K, V> CombineFn<K, V> toCombineFn(Aggregator<V>
aggregator, PType<V> ptype) {
+    return new AggregatorCombineFn<K, V>(aggregator, ptype);
   }
 
   /**
@@ -460,22 +475,27 @@ public final class Aggregators {
    */
   private static class AggregatorCombineFn<K, V> extends CombineFn<K, V> {
     // TODO: Has to be fully qualified until CombineFn.Aggregator can be removed.
-    private final org.apache.crunch.Aggregator<V> aggregator;
+    private final Aggregator<V> aggregator;
+    private final PType<V> ptype;
 
-    public AggregatorCombineFn(org.apache.crunch.Aggregator<V> aggregator) {
+    public AggregatorCombineFn(Aggregator<V> aggregator, PType<V> ptype) {
       this.aggregator = aggregator;
+      this.ptype = ptype;
     }
 
     @Override
     public void initialize() {
       aggregator.initialize(getConfiguration());
+      if (ptype != null) {
+        ptype.initialize(getConfiguration());
+      }
     }
 
     @Override
     public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>>
emitter) {
       aggregator.reset();
       for (V v : input.second()) {
-        aggregator.update(v);
+        aggregator.update(ptype == null ? v : ptype.getDetachedValue(v));
       }
       for (V v : aggregator.results()) {
         emitter.emit(Pair.of(input.first(), v));

http://git-wip-us.apache.org/repos/asf/crunch/blob/d7443e39/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
index 064bba8..d87c8f5 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/BaseGroupedTable.java
@@ -90,12 +90,13 @@ public class BaseGroupedTable<K, V> extends PCollectionImpl<Pair<K,
Iterable<V>>
 
   @Override
   public PTable<K, V> combineValues(Aggregator<V> agg) {
-    return combineValues(Aggregators.<K, V>toCombineFn(agg));
+    return combineValues(Aggregators.<K, V>toCombineFn(agg, parent.getValueType()));
   }
 
   @Override
   public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V>
reduceAgg) {
-    return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K,
V>toCombineFn(reduceAgg));
+    return combineValues(Aggregators.<K, V>toCombineFn(combineAgg, parent.getValueType()),
+        Aggregators.<K, V>toCombineFn(reduceAgg, parent.getValueType()));
   }
 
   private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>,
Pair<K, V>> {

http://git-wip-us.apache.org/repos/asf/crunch/blob/d7443e39/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 6efc062..0e4516a 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -119,12 +119,13 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K,
Iterable<V>>> implemen
 
   @Override
   public PTable<K, V> combineValues(Aggregator<V> agg) {
-    return combineValues(Aggregators.<K, V>toCombineFn(agg));
+    return combineValues(Aggregators.<K, V>toCombineFn(agg, parent.getValueType()));
   }
 
   @Override
   public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V>
reduceAgg) {
-    return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K,
V>toCombineFn(reduceAgg));
+    return combineValues(Aggregators.<K, V>toCombineFn(combineAgg, parent.getValueType()),
+        Aggregators.<K, V>toCombineFn(reduceAgg, parent.getValueType()));
   }
 
   @Override


Mime
View raw message