Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8B048200BE7 for ; Tue, 20 Dec 2016 21:40:28 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 89B3C160B12; Tue, 20 Dec 2016 20:40:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 85A64160B33 for ; Tue, 20 Dec 2016 21:40:27 +0100 (CET) Received: (qmail 85787 invoked by uid 500); 20 Dec 2016 20:40:26 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 85768 invoked by uid 99); 20 Dec 2016 20:40:26 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Dec 2016 20:40:26 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 39797C00A6 for ; Tue, 20 Dec 2016 20:40:26 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id O6d_2-UswP0t for ; Tue, 20 Dec 2016 20:40:24 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 7A7375FCBE for ; Tue, 20 Dec 2016 20:40:23 +0000 (UTC) Received: (qmail 85442 invoked by uid 99); 20 Dec 2016 20:40:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Dec 2016 20:40:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1D90DFB86; Tue, 20 Dec 2016 20:40:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Tue, 20 Dec 2016 20:40:22 -0000 Message-Id: <8487a2c4f41043bfabe18d9d4f8bf8ee@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] incubator-beam git commit: Add some key-preserving to KeyedPValueTrackingVisitor archived-at: Tue, 20 Dec 2016 20:40:28 -0000 Repository: incubator-beam Updated Branches: refs/heads/master a526adb33 -> 2f4b80312 Add some key-preserving to KeyedPValueTrackingVisitor Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/81702e67 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/81702e67 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/81702e67 Branch: refs/heads/master Commit: 81702e67b92a23849cbc8f4a16b2a619e4b477a1 Parents: 22e25a4 Author: Kenneth Knowles Authored: Thu Dec 8 11:49:15 2016 -0800 Committer: Kenneth Knowles Committed: Tue Dec 20 11:18:02 2016 -0800 ---------------------------------------------------------------------- .../beam/runners/direct/DirectRunner.java | 9 +-- .../direct/KeyedPValueTrackingVisitor.java | 35 +++++--- .../direct/KeyedPValueTrackingVisitorTest.java | 84 +++----------------- 3 files changed, 37 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81702e67/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 78163c0..afa43ff 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -31,8 +31,6 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.runners.core.SplittableParDo; -import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; -import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult; import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory; import org.apache.beam.runners.direct.ViewEvaluatorFactory.ViewOverrideFactory; @@ -306,12 +304,7 @@ public class DirectRunner extends PipelineRunner { graphVisitor.finishSpecifyingRemainder(); @SuppressWarnings("rawtypes") - KeyedPValueTrackingVisitor keyedPValueVisitor = - KeyedPValueTrackingVisitor.create( - ImmutableSet.of( - SplittableParDo.GBKIntoKeyedWorkItems.class, - DirectGroupByKeyOnly.class, - DirectGroupAlsoByWindow.class)); + KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create(); pipeline.traverseTopologically(keyedPValueVisitor); DisplayDataValidator.validatePipeline(pipeline); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81702e67/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 7f85169..e91a768 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -18,9 +18,15 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Predicates.in; +import static com.google.common.collect.Iterables.all; +import com.google.common.collect.ImmutableSet; import java.util.HashSet; import java.util.Set; +import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; +import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.GroupByKey; @@ -38,19 +44,21 @@ import org.apache.beam.sdk.values.PValue; // TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms // unkeyed class KeyedPValueTrackingVisitor implements PipelineVisitor { - @SuppressWarnings("rawtypes") - private final Set> producesKeyedOutputs; + + private static final Set> PRODUCES_KEYED_OUTPUTS = + ImmutableSet.of( + SplittableParDo.GBKIntoKeyedWorkItems.class, + DirectGroupByKeyOnly.class, + DirectGroupAlsoByWindow.class); + private final Set keyedValues; private boolean finalized; - public static KeyedPValueTrackingVisitor create( - @SuppressWarnings("rawtypes") Set> producesKeyedOutputs) { - return new KeyedPValueTrackingVisitor(producesKeyedOutputs); + public static KeyedPValueTrackingVisitor create() { + return new KeyedPValueTrackingVisitor(); } - private KeyedPValueTrackingVisitor( - @SuppressWarnings("rawtypes") Set> producesKeyedOutputs) { - this.producesKeyedOutputs = producesKeyedOutputs; + private KeyedPValueTrackingVisitor() { this.keyedValues = new HashSet<>(); } @@ -73,7 +81,7 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { node); if (node.isRootNode()) { finalized = true; - } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) { + } else if (PRODUCES_KEYED_OUTPUTS.contains(node.getTransform().getClass())) { keyedValues.addAll(node.getOutputs()); } } @@ -83,7 +91,9 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { - if (producesKeyedOutputs.contains(producer.getTransform().getClass())) { + if (PRODUCES_KEYED_OUTPUTS.contains(producer.getTransform().getClass()) + || (isKeyPreserving(producer.getTransform()) + && all(producer.getInputs(), in(keyedValues)))) { keyedValues.add(value); } } @@ -93,4 +103,9 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { finalized, "can't call getKeyedPValues before a Pipeline has been completely traversed"); return keyedValues; } + + private static boolean isKeyPreserving(PTransform transform) { + // There are currently no key-preserving transforms; this lays the infrastructure for them + return false; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/81702e67/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java index eef3375..a357005 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java @@ -21,9 +21,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -import com.google.common.collect.ImmutableSet; import java.util.Collections; -import java.util.Set; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -33,7 +31,6 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -57,54 +54,20 @@ public class KeyedPValueTrackingVisitorTest { @Before public void setup() { - - @SuppressWarnings("rawtypes") - Set> producesKeyed = - ImmutableSet.>of(PrimitiveKeyer.class, CompositeKeyer.class); - visitor = KeyedPValueTrackingVisitor.create(producesKeyed); - } - - @Test - public void primitiveProducesKeyedOutputUnkeyedInputKeyedOutput() { - PCollection keyed = - p.apply(Create.of(1, 2, 3)).apply(new PrimitiveKeyer()); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), hasItem(keyed)); - } - - @Test - public void primitiveProducesKeyedOutputKeyedInputKeyedOutut() { - PCollection keyed = - p.apply(Create.of(1, 2, 3)) - .apply("firstKey", new PrimitiveKeyer()) - .apply("secondKey", new PrimitiveKeyer()); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), hasItem(keyed)); - } - - @Test - public void compositeProducesKeyedOutputUnkeyedInputKeyedOutput() { - PCollection keyed = - p.apply(Create.of(1, 2, 3)).apply(new CompositeKeyer()); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), hasItem(keyed)); + p = TestPipeline.create(); + visitor = KeyedPValueTrackingVisitor.create(); } @Test - public void compositeProducesKeyedOutputKeyedInputKeyedOutut() { - PCollection keyed = - p.apply(Create.of(1, 2, 3)) - .apply("firstKey", new CompositeKeyer()) - .apply("secondKey", new CompositeKeyer()); + public void groupByKeyProducesKeyedOutput() { + PCollection>> keyed = + p.apply(Create.of(KV.of("foo", 3))) + .apply(GroupByKey.create()); p.traverseTopologically(visitor); assertThat(visitor.getKeyedPValues(), hasItem(keyed)); } - @Test public void noInputUnkeyedOutput() { PCollection>> unkeyed = @@ -117,26 +80,17 @@ public class KeyedPValueTrackingVisitorTest { } @Test - public void keyedInputNotProducesKeyedOutputUnkeyedOutput() { - PCollection onceKeyed = - p.apply(Create.of(1, 2, 3)) - .apply(new PrimitiveKeyer()) - .apply(ParDo.of(new IdentityFn())); + public void keyedInputWithoutKeyPreserving() { + PCollection>> onceKeyed = + p.apply(Create.of(KV.of("hello", 42))) + .apply(GroupByKey.create()) + .apply(ParDo.of(new IdentityFn>>())); p.traverseTopologically(visitor); assertThat(visitor.getKeyedPValues(), not(hasItem(onceKeyed))); } @Test - public void unkeyedInputNotProducesKeyedOutputUnkeyedOutput() { - PCollection unkeyed = - p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new IdentityFn())); - - p.traverseTopologically(visitor); - assertThat(visitor.getKeyedPValues(), not(hasItem(unkeyed))); - } - - @Test public void traverseMultipleTimesThrows() { p.apply( Create.>of( @@ -161,22 +115,6 @@ public class KeyedPValueTrackingVisitorTest { visitor.getKeyedPValues(); } - private static class PrimitiveKeyer extends PTransform, PCollection> { - @Override - public PCollection expand(PCollection input) { - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()) - .setCoder(input.getCoder()); - } - } - - private static class CompositeKeyer extends PTransform, PCollection> { - @Override - public PCollection expand(PCollection input) { - return input.apply(new PrimitiveKeyer()).apply(ParDo.of(new IdentityFn())); - } - } - private static class IdentityFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) throws Exception {