crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-232: Ensure that all nodes are cleaned up during joins (or any Crunch job involving unions)
Date Wed, 03 Jul 2013 01:20:52 GMT
Updated Branches:
  refs/heads/master dea3fd93e -> 4983a0ca8


CRUNCH-232: Ensure that all nodes are cleaned up during joins (or any Crunch job involving
unions)


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/4983a0ca
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/4983a0ca
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/4983a0ca

Branch: refs/heads/master
Commit: 4983a0ca85d814e29cf60d3588dc2a38ce3b5aa0
Parents: dea3fd9
Author: Josh Wills <jwills@apache.org>
Authored: Tue Jul 2 18:01:38 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Tue Jul 2 18:01:38 2013 -0700

----------------------------------------------------------------------
 .../crunch/lib/join/MapsideJoinStrategyIT.java  | 22 ++++++++---------
 .../apache/crunch/impl/mr/run/CrunchMapper.java | 16 +++++--------
 .../crunch/impl/mr/run/CrunchReducer.java       | 17 ++++---------
 .../crunch/impl/mr/run/CrunchTaskContext.java   | 25 ++++++++++----------
 .../org/apache/crunch/impl/mr/run/RTNode.java   |  8 +++++--
 5 files changed, 41 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
index c459ad8..09723d2 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/join/MapsideJoinStrategyIT.java
@@ -30,7 +30,6 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.PipelineResult;
 import org.apache.crunch.fn.FilterFns;
-import org.apache.crunch.fn.MapValuesFn;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.test.TemporaryPath;
@@ -72,14 +71,14 @@ public class MapsideJoinStrategyIT {
     }
   }
 
-  private static class CapOrdersFn extends MapValuesFn<Integer, String, String> {
+  private static class CapOrdersFn extends MapFn<String, String> {
     @Override
     public String map(String v) {
       return v.toUpperCase();
     }
   }
   
-  private static class ConcatValuesFn extends MapValuesFn<Integer, Pair<String, String>,
String> {
+  private static class ConcatValuesFn extends MapFn<Pair<String, String>, String>
{
     @Override
     public String map(Pair<String, String> v) {
       return v.toString();
@@ -109,7 +108,7 @@ public class MapsideJoinStrategyIT {
         .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType());
 
     
-    MapsideJoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer,
String, String>();
+    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer,
String, String>();
     PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(customerTable,
filteredOrderTable, JoinType.INNER_JOIN);
 
     List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());
@@ -131,11 +130,11 @@ public class MapsideJoinStrategyIT {
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
     
-    MapsideJoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer,
String, String>();
+    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer,
String, String>();
     PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable,
JoinType.INNER_JOIN)
-        .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings()));
+        .mapValues("concat", new ConcatValuesFn(), Writables.strings());
 
-    PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(),
orderTable.getPTableType());
+    PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
     
     PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders,
ORDER_TABLE, JoinType.INNER_JOIN);
 
@@ -163,11 +162,11 @@ public class MapsideJoinStrategyIT {
     PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
     PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");
     
-    MapsideJoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer,
String, String>();
+    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer,
String, String>();
     PTable<Integer, String> custOrders = mapsideJoin.join(customerTable, orderTable,
JoinType.LEFT_OUTER_JOIN)
-        .parallelDo("concat", new ConcatValuesFn(), Writables.tableOf(Writables.ints(), Writables.strings()));
+        .mapValues("concat", new ConcatValuesFn(), Writables.strings());
 
-    PTable<Integer, String> ORDER_TABLE = orderTable.parallelDo(new CapOrdersFn(),
orderTable.getPTableType());
+    PTable<Integer, String> ORDER_TABLE = orderTable.mapValues(new CapOrdersFn(), orderTable.getValueType());
     
     PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(custOrders,
ORDER_TABLE, JoinType.LEFT_OUTER_JOIN);
 
@@ -194,7 +193,8 @@ public class MapsideJoinStrategyIT {
 
   private PTable<Integer, String> readTable(Pipeline pipeline, String filename) {
     try {
-      return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable",
new LineSplitter(),
+      return pipeline.readTextFile(tmpDir.copyResourceFileName(filename)).parallelDo("asTable",
+          new LineSplitter(),
           Writables.tableOf(Writables.ints(), Writables.strings()));
     } catch (IOException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
index 70f0b01..0e2ef38 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchMapper.java
@@ -17,12 +17,10 @@
  */
 package org.apache.crunch.impl.mr.run;
 
-import java.io.IOException;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
 import org.apache.hadoop.mapreduce.Mapper;
 
 public class CrunchMapper extends Mapper<Object, Object, Object, Object> {
@@ -35,21 +33,19 @@ public class CrunchMapper extends Mapper<Object, Object, Object, Object>
{
 
   @Override
   protected void setup(Mapper<Object, Object, Object, Object>.Context context) {
-    List<RTNode> nodes;
-    this.ctxt = new CrunchTaskContext(context, NodeContext.MAP);
-    try {
-      nodes = ctxt.getNodes();
-    } catch (IOException e) {
-      LOG.info("Crunch deserialization error", e);
-      throw new CrunchRuntimeException(e);
+    if (ctxt == null) {
+      ctxt = new CrunchTaskContext(context, NodeContext.MAP);
+      this.debug = ctxt.isDebugRun();
     }
+    
+    List<RTNode> nodes = ctxt.getNodes();
     if (nodes.size() == 1) {
       this.node = nodes.get(0);
     } else {
       CrunchInputSplit split = (CrunchInputSplit) context.getInputSplit();
       this.node = nodes.get(split.getNodeIndex());
     }
-    this.debug = ctxt.isDebugRun();
+    this.node.initialize(ctxt);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
index e5ddbd2..c3e3e3e 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchReducer.java
@@ -17,12 +17,8 @@
  */
 package org.apache.crunch.impl.mr.run;
 
-import java.io.IOException;
-import java.util.List;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.impl.SingleUseIterable;
 import org.apache.hadoop.mapreduce.Reducer;
 
@@ -40,15 +36,12 @@ public class CrunchReducer extends Reducer<Object, Object, Object,
Object> {
 
   @Override
   protected void setup(Reducer<Object, Object, Object, Object>.Context context) {
-    this.ctxt = new CrunchTaskContext(context, getNodeContext());
-    try {
-      List<RTNode> nodes = ctxt.getNodes();
-      this.node = nodes.get(0);
-    } catch (IOException e) {
-      LOG.info("Crunch deserialization error", e);
-      throw new CrunchRuntimeException(e);
+    if (ctxt == null) {
+      this.ctxt = new CrunchTaskContext(context, getNodeContext());
+      this.debug = ctxt.isDebugRun();
     }
-    this.debug = ctxt.isDebugRun();
+    this.node = ctxt.getNodes().get(0);
+    this.node.initialize(ctxt);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
index c4f2873..b81df05 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -32,11 +32,21 @@ class CrunchTaskContext {
 
   private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
   private final NodeContext nodeContext;
+  private final List<RTNode> nodes;
   private CrunchOutputs<Object, Object> multipleOutputs;
-
-  public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext,
NodeContext nodeContext) {
+  
+  public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext,
+      NodeContext nodeContext) {
     this.taskContext = taskContext;
     this.nodeContext = nodeContext;
+    Configuration conf = taskContext.getConfiguration();
+    Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)),
+        nodeContext.toString());
+    try {
+      this.nodes = (List<RTNode>) DistCache.read(conf, path);
+    } catch (IOException e) {
+      throw new CrunchRuntimeException("Could not read runtime node information", e);
+    }
   }
 
   public TaskInputOutputContext<Object, Object, Object, Object> getContext() {
@@ -47,16 +57,7 @@ class CrunchTaskContext {
     return nodeContext;
   }
 
-  public List<RTNode> getNodes() throws IOException {
-    Configuration conf = taskContext.getConfiguration();
-    Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)),
nodeContext.toString());
-    @SuppressWarnings("unchecked")
-    List<RTNode> nodes = (List<RTNode>) DistCache.read(conf, path);
-    if (nodes != null) {
-      for (RTNode node : nodes) {
-        node.initialize(this);
-      }
-    }
+  public List<RTNode> getNodes() {
     return nodes;
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/4983a0ca/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
index ce7b795..fd7697c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -45,9 +45,13 @@ public class RTNode implements Serializable {
 
   private transient Emitter<Object> emitter;
 
-  public RTNode(DoFn<Object, Object> fn, PType<Object> outputPType, String name,
List<RTNode> children,
+  public RTNode(DoFn<Object, Object> fn,
+      PType<Object> outputPType,
+      String name,
+      List<RTNode> children,
       Converter inputConverter,
-      Converter outputConverter, String outputName) {
+      Converter outputConverter,
+      String outputName) {
     this.fn = fn;
     this.outputPType = outputPType;
     this.nodeName = name;


Mime
View raw message