crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-157: Make setContext and initialize separate function calls in RTNode and elsewhere
Date Tue, 05 Feb 2013 12:26:31 GMT
Updated Branches:
  refs/heads/master 6536f4c90 -> 3e9d6ed64


CRUNCH-157: Make setContext and initialize separate function calls in RTNode and elsewhere


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/3e9d6ed6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/3e9d6ed6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/3e9d6ed6

Branch: refs/heads/master
Commit: 3e9d6ed64315ebf0a4d6f8081ca3c9102c027a70
Parents: 6536f4c
Author: Josh Wills <jwills@apache.org>
Authored: Sun Feb 3 10:18:13 2013 -0800
Committer: Josh Wills <jwills@apache.org>
Committed: Mon Feb 4 19:03:11 2013 -0800

----------------------------------------------------------------------
 crunch/src/main/java/org/apache/crunch/DoFn.java   |    1 -
 .../src/main/java/org/apache/crunch/FilterFn.java  |   22 ++++++-
 .../java/org/apache/crunch/fn/CompositeMapFn.java  |   11 +++-
 .../java/org/apache/crunch/fn/ExtractKeyFn.java    |    8 ++-
 .../main/java/org/apache/crunch/fn/PairMapFn.java  |   13 +++-
 .../crunch/impl/mem/collect/MemCollection.java     |    2 +
 .../java/org/apache/crunch/impl/mr/run/RTNode.java |    1 +
 .../java/org/apache/crunch/lib/SecondarySort.java  |    1 +
 .../org/apache/crunch/types/PGroupedTableType.java |   10 +++-
 .../apache/crunch/types/avro/AvroTableType.java    |   21 +++++-
 .../java/org/apache/crunch/types/avro/Avros.java   |   49 ++++++++++++--
 .../apache/crunch/types/writable/Writables.java    |   50 ++++++++++++---
 .../org/apache/crunch/types/avro/AvrosTest.java    |    4 +-
 13 files changed, 160 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/DoFn.java b/crunch/src/main/java/org/apache/crunch/DoFn.java
index 8d7cc17..2c6389a 100644
--- a/crunch/src/main/java/org/apache/crunch/DoFn.java
+++ b/crunch/src/main/java/org/apache/crunch/DoFn.java
@@ -103,7 +103,6 @@ public abstract class DoFn<S, T> implements Serializable {
    */
   public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
     this.context = context;
-    initialize();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/FilterFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/FilterFn.java b/crunch/src/main/java/org/apache/crunch/FilterFn.java
index d635b66..440f122 100644
--- a/crunch/src/main/java/org/apache/crunch/FilterFn.java
+++ b/crunch/src/main/java/org/apache/crunch/FilterFn.java
@@ -91,10 +91,16 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
       for (FilterFn<S> fn : fns) {
         fn.setContext(context);
       }
-      initialize();
     }
     
     @Override
+    public void initialize() {
+      for (FilterFn<S> fn : fns) {
+        fn.initialize();
+      }
+    }
+
+    @Override
     public void cleanup() {
       for (FilterFn<S> fn : fns) {
         fn.cleanup();
@@ -151,7 +157,13 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
       for (FilterFn<S> fn : fns) {
         fn.setContext(context);
       }
-      initialize();
+    }
+    
+    @Override
+    public void initialize() {
+      for (FilterFn<S> fn : fns) {
+        fn.initialize();
+      }
     }
     
     @Override
@@ -207,7 +219,11 @@ public abstract class FilterFn<T> extends DoFn<T, T> {
     @Override
     public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
       base.setContext(context);
-      initialize();
+    }
+    
+    @Override
+    public void initialize() {
+      base.initialize();
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
index 4714fe4..2a8e7d9 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/CompositeMapFn.java
@@ -20,6 +20,7 @@ package org.apache.crunch.fn;
 import org.apache.crunch.Emitter;
 import org.apache.crunch.MapFn;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
 
@@ -32,9 +33,15 @@ public class CompositeMapFn<R, S, T> extends MapFn<R, T> {
   }
 
   @Override
+  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+    first.setContext(context);
+    second.setContext(context);
+  }
+  
+  @Override
   public void initialize() {
-    first.setContext(getContext());
-    second.setContext(getContext());
+    first.initialize();
+    second.initialize();
   }
 
   public MapFn<R, S> getFirst() {

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
index 99ce277..b8cc9df 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/ExtractKeyFn.java
@@ -19,6 +19,7 @@ package org.apache.crunch.fn;
 
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 /**
  * Wrapper function for converting a {@code MapFn} into a key-value pair that is
@@ -33,8 +34,13 @@ public class ExtractKeyFn<K, V> extends MapFn<V, Pair<K, V>>
{
   }
 
   @Override
+  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+    mapFn.setContext(context);
+  }
+  
+  @Override
   public void initialize() {
-    this.mapFn.setContext(getContext());
+    mapFn.initialize();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
index b25a6d8..9ee4336 100644
--- a/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
+++ b/crunch/src/main/java/org/apache/crunch/fn/PairMapFn.java
@@ -21,6 +21,7 @@ import org.apache.crunch.Emitter;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>, Pair<S, T>>
{
 
@@ -39,12 +40,18 @@ public class PairMapFn<K, V, S, T> extends MapFn<Pair<K, V>,
Pair<S, T>> {
   }
 
   @Override
-  public void initialize() {
-    keys.setContext(getContext());
-    values.setContext(getContext());
+  public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+    keys.setContext(context);
+    values.setContext(context);
   }
 
   @Override
+  public void initialize() {
+    keys.initialize();
+    values.initialize();
+  }
+  
+  @Override
   public Pair<S, T> map(Pair<K, V> input) {
     return Pair.of(keys.map(input.first()), values.map(input.second()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index defad1b..cc9f3fc 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -104,6 +104,7 @@ public class MemCollection<S> implements PCollection<S> {
       ParallelDoOptions options) {
     InMemoryEmitter<T> emitter = new InMemoryEmitter<T>();
     doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
+    doFn.initialize();
     for (S s : collect) {
       doFn.process(s, emitter);
     }
@@ -126,6 +127,7 @@ public class MemCollection<S> implements PCollection<S> {
       ParallelDoOptions options) {
     InMemoryEmitter<Pair<K, V>> emitter = new InMemoryEmitter<Pair<K, V>>();
     doFn.setContext(getInMemoryContext(getPipeline().getConfiguration()));
+    doFn.initialize();
     for (S s : collect) {
       doFn.process(s, emitter);
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
index e30980d..ce7b795 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -64,6 +64,7 @@ public class RTNode implements Serializable {
     }
 
     fn.setContext(ctxt.getContext());
+    fn.initialize();
     for (RTNode child : children) {
       child.initialize(ctxt);
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
index 30639b1..54b4396 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/SecondarySort.java
@@ -102,6 +102,7 @@ public class SecondarySort {
     @Override
     public void initialize() {
       intern.setContext(getContext());
+      intern.initialize();
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
index c222c89..e9c773c 100644
--- a/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/PGroupedTableType.java
@@ -28,6 +28,7 @@ import org.apache.crunch.SourceTarget;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 import com.google.common.collect.Iterables;
 
@@ -87,10 +88,15 @@ public abstract class PGroupedTableType<K, V> implements PType<Pair<K,
Iterable<
       values.configure(conf);
     }
     
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      keys.setContext(context);
+      values.setContext(context);
+    }
+    
     @Override
     public void initialize() {
-      keys.setContext(getContext());
-      values.setContext(getContext());
+      keys.initialize();
+      values.initialize();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
index 31dbd74..86613df 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/AvroTableType.java
@@ -27,6 +27,7 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.TupleDeepCopier;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 /**
  * The implementation of the PTableType interface for Avro-based serialization.
@@ -57,9 +58,15 @@ class AvroTableType<K, V> extends AvroType<Pair<K, V>>
implements PTableType<K,
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      keyMapFn.setContext(context);
+      valueMapFn.setContext(context);
+    }
+    
+    @Override
     public void initialize() {
-      keyMapFn.setContext(getContext());
-      valueMapFn.setContext(getContext());
+      keyMapFn.initialize();
+      valueMapFn.initialize();
       pairSchemaJson = org.apache.avro.mapred.Pair.getPairSchema(
           new Schema.Parser().parse(firstJson), new Schema.Parser().parse(secondJson)).toString();
     }
@@ -93,9 +100,15 @@ class AvroTableType<K, V> extends AvroType<Pair<K, V>>
implements PTableType<K,
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      firstMapFn.setContext(context);
+      secondMapFn.setContext(context);
+    }
+    
+    @Override
     public void initialize() {
-      firstMapFn.setContext(getContext());
-      secondMapFn.setContext(getContext());
+      firstMapFn.initialize();
+      secondMapFn.initialize();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
index c8a2ef5..fc30eaf 100644
--- a/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
+++ b/crunch/src/main/java/org/apache/crunch/types/avro/Avros.java
@@ -56,6 +56,7 @@ import org.apache.crunch.types.TupleFactory;
 import org.apache.crunch.types.writable.WritableDeepCopier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.collect.ImmutableList;
@@ -292,8 +293,13 @@ public class Avros {
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
     public void initialize() {
-      this.mapFn.setContext(getContext());
+      mapFn.initialize();
     }
 
     @Override
@@ -331,8 +337,13 @@ public class Avros {
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
     public void initialize() {
-      this.mapFn.setContext(getContext());
+      mapFn.initialize();
     }
 
     @Override
@@ -369,8 +380,13 @@ public class Avros {
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
     public void initialize() {
-      this.mapFn.setContext(getContext());
+      mapFn.initialize();
     }
 
     @Override
@@ -396,8 +412,13 @@ public class Avros {
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
     public void initialize() {
-      this.mapFn.setContext(getContext());
+      this.mapFn.initialize();
     }
 
     @Override
@@ -441,9 +462,16 @@ public class Avros {
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(context);
+      }
+    }
+    
+    @Override
     public void initialize() {
       for (MapFn fn : fns) {
-        fn.setContext(getContext());
+        fn.initialize();
       }
       this.values = new Object[fns.size()];
       tupleFactory.initialize();
@@ -499,12 +527,19 @@ public class Avros {
         fn.configure(conf);
       }
     }
-
+ 
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(getContext());
+      }
+    }
+    
     @Override
     public void initialize() {
       this.schema = new Schema.Parser().parse(jsonSchema);
       for (MapFn fn : fns) {
-        fn.setContext(getContext());
+        fn.initialize();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
index 67f0621..25ae370 100644
--- a/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
+++ b/crunch/src/main/java/org/apache/crunch/types/writable/Writables.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -297,9 +298,16 @@ public class Writables {
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(context);
+      }
+    }
+    
+    @Override
     public void initialize() {
       for (MapFn fn : fns) {
-        fn.setContext(getContext());
+        fn.initialize();
       }
       // The rest of the methods allocate new
       // objects each time. However this one
@@ -346,13 +354,19 @@ public class Writables {
       }
     }
 
-
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      for (MapFn fn : fns) {
+        fn.setContext(context);
+      }
+    }
+    
     @Override
     public void initialize() {
       this.values = new Writable[fns.size()];
       this.writable = new TupleWritable(values);
       for (MapFn fn : fns) {
-        fn.setContext(getContext());
+        fn.initialize();
       }
     }
 
@@ -426,10 +440,14 @@ public class Writables {
       mapFn.configure(conf);
     }
 
-
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
     @Override
     public void initialize() {
-      mapFn.setContext(getContext());
+      mapFn.initialize();
     }
 
     @Override
@@ -457,10 +475,14 @@ public class Writables {
       mapFn.configure(conf);
     }
 
-
+    @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
     @Override
     public void initialize() {
-      mapFn.setContext(getContext());
+      mapFn.initialize();
     }
 
     @Override
@@ -495,8 +517,13 @@ public class Writables {
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
     public void initialize() {
-      mapFn.setContext(getContext());
+      mapFn.initialize();
     }
 
     @Override
@@ -525,8 +552,13 @@ public class Writables {
     }
 
     @Override
+    public void setContext(TaskInputOutputContext<?, ?, ?, ?> context) {
+      mapFn.setContext(context);
+    }
+    
+    @Override
     public void initialize() {
-      mapFn.setContext(getContext());
+      mapFn.initialize();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/3e9d6ed6/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
index dabf0fe..5622a56 100644
--- a/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
+++ b/crunch/src/test/java/org/apache/crunch/types/avro/AvrosTest.java
@@ -192,8 +192,10 @@ public class AvrosTest {
     
     TaskInputOutputContext<?, ?, ?, ?> testContext = CrunchTestSupport.getTestContext(new
Configuration());
     at.getInputMapFn().setContext(testContext);
+    at.getInputMapFn().initialize();
     at.getOutputMapFn().setContext(testContext);
-
+    at.getOutputMapFn().initialize();
+    
     LongWritable lw = new LongWritable(1729L);
     assertEquals(lw, at.getInputMapFn().map(at.getOutputMapFn().map(lw)));
   }


Mime
View raw message