crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-269: Add option for disabling deep copies on intermediate outputs from DoFns.
Date Sat, 21 Sep 2013 20:00:34 GMT
Updated Branches:
  refs/heads/master 013f2e19a -> 14f0c16b5


CRUNCH-269: Add option for disabling deep copies on intermediate outputs from DoFns.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/14f0c16b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/14f0c16b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/14f0c16b

Branch: refs/heads/master
Commit: 14f0c16b5bcf6496bedcf184d22c10462ababe7e
Parents: 013f2e1
Author: Josh Wills <jwills@apache.org>
Authored: Fri Sep 20 16:33:08 2013 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Fri Sep 20 16:33:08 2013 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/crunch/DoFn.java   | 13 ++++++++++
 .../impl/mr/emit/IntermediateEmitter.java       |  5 ++--
 .../org/apache/crunch/impl/mr/run/RTNode.java   |  8 ++++---
 .../crunch/impl/mr/run/RuntimeParameters.java   |  2 ++
 .../impl/mr/emit/IntermediateEmitterTest.java   | 25 ++++++++++++++++++--
 5 files changed, 46 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
index 6da89ef..6ae89a4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java
+++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java
@@ -119,6 +119,19 @@ public abstract class DoFn<S, T> implements Serializable {
     return 1.2f;
   }
 
+  /**
+   * By default, Crunch will do a defensive deep copy of the outputs of a
+   * DoFn when there are multiple downstream consumers of that item, in order to
+   * prevent the downstream functions from making concurrent modifications to
+   * data objects. This introduces some extra overhead in cases where you know
+   * that the downstream code is only reading the objects and not modifying it,
+   * so you can disable this feature by overriding this function to
+   * return {@code true}.
+   */
+  public boolean disableDeepCopy() {
+    return false;
+  }
+
   protected TaskInputOutputContext<?, ?, ?, ?> getContext() {
     return context;
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
index b6df98b..955aed8 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java
@@ -39,13 +39,14 @@ public class IntermediateEmitter implements Emitter<Object> {
   private final PType<Object> outputPType;
   private final boolean needDetachedValues;
 
-  public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children,
Configuration conf) {
+  public IntermediateEmitter(PType<Object> outputPType, List<RTNode> children,
Configuration conf,
+                             boolean disableDeepCopy) {
     this.outputPType = outputPType;
     this.children = ImmutableList.copyOf(children);
     this.conf = conf;
 
     outputPType.initialize(conf);
-    needDetachedValues = this.children.size() > 1;
+    needDetachedValues = !disableDeepCopy && this.children.size() > 1;
   }
 
   public void emit(Object emitted) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
index fd7697c..f17beb0 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -30,6 +30,8 @@ import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter;
 import org.apache.crunch.impl.mr.emit.OutputEmitter;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
 
 public class RTNode implements Serializable {
 
@@ -66,7 +68,6 @@ public class RTNode implements Serializable {
       // Already initialized
       return;
     }
-
     fn.setContext(ctxt.getContext());
     fn.initialize();
     for (RTNode child : children) {
@@ -81,8 +82,9 @@ public class RTNode implements Serializable {
         this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
       }
     } else if (!children.isEmpty()) {
-      this.emitter = new IntermediateEmitter(outputPType, children,
-          ctxt.getContext().getConfiguration());
+      Configuration conf = ctxt.getContext().getConfiguration();
+      boolean disableDeepCopy = conf.getBoolean(RuntimeParameters.DISABLE_DEEP_COPY, false);
+      this.emitter = new IntermediateEmitter(outputPType, children, conf, disableDeepCopy
|| fn.disableDeepCopy());
     } else {
       throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
     }

http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 7dc8521..a8e8aff 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -34,6 +34,8 @@ public class RuntimeParameters {
 
   public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist";
 
+  public static final String DISABLE_DEEP_COPY = "crunch.disable.deep.copy";
+
   // Not instantiated
   private RuntimeParameters() {
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
index dd72364..0971211 100644
--- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java
@@ -50,7 +50,7 @@ public class IntermediateEmitterTest {
   public void testEmit_SingleChild() {
     RTNode singleChild = mock(RTNode.class);
     IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild),
-        new Configuration());
+        new Configuration(), false);
     emitter.emit(stringWrapper);
 
     ArgumentCaptor<StringWrapper> argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class);
@@ -63,7 +63,7 @@ public class IntermediateEmitterTest {
     RTNode childA = mock(RTNode.class);
     RTNode childB = mock(RTNode.class);
     IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA,
childB),
-        new Configuration());
+        new Configuration(), false);
     emitter.emit(stringWrapper);
 
     ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class);
@@ -80,4 +80,25 @@ public class IntermediateEmitterTest {
     assertNotSame(stringWrapper, argumentCaptorB.getValue());
   }
 
+  @Test
+  public void testEmit_MultipleChildrenDisableDeepCopy() {
+    RTNode childA = mock(RTNode.class);
+    RTNode childB = mock(RTNode.class);
+    IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA,
childB),
+        new Configuration(), true);
+    emitter.emit(stringWrapper);
+
+    ArgumentCaptor<StringWrapper> argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class);
+    ArgumentCaptor<StringWrapper> argumentCaptorB = ArgumentCaptor.forClass(StringWrapper.class);
+
+    verify(childA).process(argumentCaptorA.capture());
+    verify(childB).process(argumentCaptorB.capture());
+
+    assertEquals(stringWrapper, argumentCaptorA.getValue());
+    assertEquals(stringWrapper, argumentCaptorB.getValue());
+
+    // Make sure that multiple children without deep copies are the same instance
+    assertSame(stringWrapper, argumentCaptorA.getValue());
+    assertSame(stringWrapper, argumentCaptorB.getValue());
+  }
 }


Mime
View raw message