Return-Path: X-Original-To: apmail-crunch-commits-archive@www.apache.org Delivered-To: apmail-crunch-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B76D710DA7 for ; Sat, 21 Sep 2013 20:00:38 +0000 (UTC) Received: (qmail 29666 invoked by uid 500); 21 Sep 2013 20:00:38 -0000 Delivered-To: apmail-crunch-commits-archive@crunch.apache.org Received: (qmail 29622 invoked by uid 500); 21 Sep 2013 20:00:36 -0000 Mailing-List: contact commits-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list commits@crunch.apache.org Received: (qmail 29615 invoked by uid 99); 21 Sep 2013 20:00:35 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 21 Sep 2013 20:00:35 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A78568B03CF; Sat, 21 Sep 2013 20:00:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jwills@apache.org To: commits@crunch.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: CRUNCH-269: Add option for disabling deep copies on intermediate outputs from DoFns. Date: Sat, 21 Sep 2013 20:00:34 +0000 (UTC) Updated Branches: refs/heads/master 013f2e19a -> 14f0c16b5 CRUNCH-269: Add option for disabling deep copies on intermediate outputs from DoFns. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/14f0c16b Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/14f0c16b Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/14f0c16b Branch: refs/heads/master Commit: 14f0c16b5bcf6496bedcf184d22c10462ababe7e Parents: 013f2e1 Author: Josh Wills Authored: Fri Sep 20 16:33:08 2013 -0700 Committer: Josh Wills Committed: Fri Sep 20 16:33:08 2013 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/DoFn.java | 13 ++++++++++ .../impl/mr/emit/IntermediateEmitter.java | 5 ++-- .../org/apache/crunch/impl/mr/run/RTNode.java | 8 ++++--- .../crunch/impl/mr/run/RuntimeParameters.java | 2 ++ .../impl/mr/emit/IntermediateEmitterTest.java | 25 ++++++++++++++++++-- 5 files changed, 46 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/DoFn.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/DoFn.java b/crunch-core/src/main/java/org/apache/crunch/DoFn.java index 6da89ef..6ae89a4 100644 --- a/crunch-core/src/main/java/org/apache/crunch/DoFn.java +++ b/crunch-core/src/main/java/org/apache/crunch/DoFn.java @@ -119,6 +119,19 @@ public abstract class DoFn implements Serializable { return 1.2f; } + /** + * By default, Crunch will do a defensive deep copy of the outputs of a + * DoFn when there are multiple downstream consumers of that item, in order to + * prevent the downstream functions from making concurrent modifications to + * data objects. This introduces some extra overhead in cases where you know + * that the downstream code is only reading the objects and not modifying it, + * so you can disable this feature by overriding this function to + * return {@code true}. + */ + public boolean disableDeepCopy() { + return false; + } + protected TaskInputOutputContext getContext() { return context; } http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java index b6df98b..955aed8 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/emit/IntermediateEmitter.java @@ -39,13 +39,14 @@ public class IntermediateEmitter implements Emitter { private final PType outputPType; private final boolean needDetachedValues; - public IntermediateEmitter(PType outputPType, List children, Configuration conf) { + public IntermediateEmitter(PType outputPType, List children, Configuration conf, + boolean disableDeepCopy) { this.outputPType = outputPType; this.children = ImmutableList.copyOf(children); this.conf = conf; outputPType.initialize(conf); - needDetachedValues = this.children.size() > 1; + needDetachedValues = !disableDeepCopy && this.children.size() > 1; } public void emit(Object emitted) { http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java index fd7697c..f17beb0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java @@ -30,6 +30,8 @@ import org.apache.crunch.impl.mr.emit.MultipleOutputEmitter; import org.apache.crunch.impl.mr.emit.OutputEmitter; import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.TaskInputOutputContext; public class RTNode implements Serializable { @@ -66,7 +68,6 @@ public class RTNode implements Serializable { // Already initialized return; } - fn.setContext(ctxt.getContext()); fn.initialize(); for (RTNode child : children) { @@ -81,8 +82,9 @@ public class RTNode implements Serializable { this.emitter = new OutputEmitter(outputConverter, ctxt.getContext()); } } else if (!children.isEmpty()) { - this.emitter = new IntermediateEmitter(outputPType, children, - ctxt.getContext().getConfiguration()); + Configuration conf = ctxt.getContext().getConfiguration(); + boolean disableDeepCopy = conf.getBoolean(RuntimeParameters.DISABLE_DEEP_COPY, false); + this.emitter = new IntermediateEmitter(outputPType, children, conf, disableDeepCopy || fn.disableDeepCopy()); } else { throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName); } http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index 7dc8521..a8e8aff 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -34,6 +34,8 @@ public class RuntimeParameters { public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; + public static final String DISABLE_DEEP_COPY = "crunch.disable.deep.copy"; + // Not instantiated private RuntimeParameters() { } http://git-wip-us.apache.org/repos/asf/crunch/blob/14f0c16b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java index dd72364..0971211 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/emit/IntermediateEmitterTest.java @@ -50,7 +50,7 @@ public class IntermediateEmitterTest { public void testEmit_SingleChild() { RTNode singleChild = mock(RTNode.class); IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(singleChild), - new Configuration()); + new Configuration(), false); emitter.emit(stringWrapper); ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(StringWrapper.class); @@ -63,7 +63,7 @@ public class IntermediateEmitterTest { RTNode childA = mock(RTNode.class); RTNode childB = mock(RTNode.class); IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB), - new Configuration()); + new Configuration(), false); emitter.emit(stringWrapper); ArgumentCaptor argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class); @@ -80,4 +80,25 @@ public class IntermediateEmitterTest { assertNotSame(stringWrapper, argumentCaptorB.getValue()); } + @Test + public void testEmit_MultipleChildrenDisableDeepCopy() { + RTNode childA = mock(RTNode.class); + RTNode childB = mock(RTNode.class); + IntermediateEmitter emitter = new IntermediateEmitter(ptype, Lists.newArrayList(childA, childB), + new Configuration(), true); + emitter.emit(stringWrapper); + + ArgumentCaptor argumentCaptorA = ArgumentCaptor.forClass(StringWrapper.class); + ArgumentCaptor argumentCaptorB = ArgumentCaptor.forClass(StringWrapper.class); + + verify(childA).process(argumentCaptorA.capture()); + verify(childB).process(argumentCaptorB.capture()); + + assertEquals(stringWrapper, argumentCaptorA.getValue()); + assertEquals(stringWrapper, argumentCaptorB.getValue()); + + // Make sure that multiple children without deep copies are the same instance + assertSame(stringWrapper, argumentCaptorA.getValue()); + assertSame(stringWrapper, argumentCaptorB.getValue()); + } }