Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 50C15200C23 for ; Wed, 22 Feb 2017 18:36:43 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4F807160B49; Wed, 22 Feb 2017 17:36:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 556AE160B62 for ; Wed, 22 Feb 2017 18:36:42 +0100 (CET) Received: (qmail 77319 invoked by uid 500); 22 Feb 2017 17:36:41 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 77292 invoked by uid 99); 22 Feb 2017 17:36:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Feb 2017 17:36:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 47951DFF13; Wed, 22 Feb 2017 17:36:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tgroh@apache.org To: commits@beam.apache.org Date: Wed, 22 Feb 2017 17:36:41 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] beam git commit: Migrate DirectRunner one-to-one factories archived-at: Wed, 22 Feb 2017 17:36:43 -0000 Repository: beam Updated Branches: refs/heads/master 453e37bc6 -> fbaac0fc8 Migrate DirectRunner one-to-one factories Use SingleInputOutputOverrideFactory to reduce prevalence of boilerplate. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a9361fa6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a9361fa6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a9361fa6 Branch: refs/heads/master Commit: a9361fa6b68d3d6f6333872a32fd5f923f9f9673 Parents: 926385c Author: Thomas Groh Authored: Tue Feb 21 09:55:47 2017 -0800 Committer: Thomas Groh Committed: Wed Feb 22 09:36:17 2017 -0800 ---------------------------------------------------------------------- ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 23 ++----------------- .../direct/DirectGroupByKeyOverrideFactory.java | 22 ++---------------- .../ParDoSingleViaMultiOverrideFactory.java | 24 +++----------------- .../runners/direct/ViewEvaluatorFactory.java | 24 +++----------------- .../DirectGroupByKeyOverrideFactoryTest.java | 8 +++---- .../ParDoSingleViaMultiOverrideFactoryTest.java | 3 ++- 6 files changed, 16 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java index a957a17..bb90a6c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java @@ -17,26 +17,19 @@ */ package org.apache.beam.runners.direct; -import com.google.common.collect.Iterables; -import java.util.List; -import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; /** * Provides an implementation of {@link SplittableParDo.GBKIntoKeyedWorkItems} for the Direct * Runner. */ class DirectGBKIntoKeyedWorkItemsOverrideFactory - implements PTransformOverrideFactory< + extends SingleInputOutputOverrideFactory< PCollection>, PCollection>, GBKIntoKeyedWorkItems> { @Override @@ -44,16 +37,4 @@ class DirectGBKIntoKeyedWorkItemsOverrideFactory getReplacementTransform(GBKIntoKeyedWorkItems transform) { return new DirectGroupByKey.DirectGroupByKeyOnly<>(); } - - @Override - public PCollection> getInput( - List inputs, Pipeline p) { - return (PCollection>) Iterables.getOnlyElement(inputs).getValue(); - } - - @Override - public Map mapOutputs( - List outputs, PCollection> newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java index 1651987..f3b718f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java @@ -17,38 +17,20 @@ */ package org.apache.beam.runners.direct; -import com.google.common.collect.Iterables; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */ final class DirectGroupByKeyOverrideFactory - implements PTransformOverrideFactory< + extends SingleInputOutputOverrideFactory< PCollection>, PCollection>>, GroupByKey> { @Override public PTransform>, PCollection>>> getReplacementTransform( GroupByKey transform) { return new DirectGroupByKey<>(transform); } - - @Override - public PCollection> getInput( - List inputs, Pipeline p) { - return (PCollection>) Iterables.getOnlyElement(inputs).getValue(); - } - - @Override - public Map mapOutputs( - List outputs, PCollection>> newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java index 990efb3..f859729 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java @@ -17,19 +17,13 @@ */ package org.apache.beam.runners.direct; -import com.google.common.collect.Iterables; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.construction.ReplacementOutputs; -import org.apache.beam.sdk.Pipeline; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; @@ -38,26 +32,14 @@ import org.apache.beam.sdk.values.TupleTagList; * it in terms of multi-output {@link ParDo}. */ class ParDoSingleViaMultiOverrideFactory - implements PTransformOverrideFactory< - PCollection, PCollection, Bound> { + extends SingleInputOutputOverrideFactory< + PCollection, PCollection, Bound> { @Override public PTransform, PCollection> getReplacementTransform( Bound transform) { return new ParDoSingleViaMulti<>(transform); } - @Override - public PCollection getInput( - List inputs, Pipeline p) { - return (PCollection) Iterables.getOnlyElement(inputs).getValue(); - } - - @Override - public Map mapOutputs( - List outputs, PCollection newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } - static class ParDoSingleViaMulti extends PTransform, PCollection> { private static final String MAIN_OUTPUT_TAG = "main"; http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index 49faaa9..0fa6254 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -20,15 +20,12 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.construction.ReplacementOutputs; +import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.direct.CommittedResult.OutputType; import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter; import org.apache.beam.runners.direct.StepTransformResult.Builder; -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; @@ -38,8 +35,6 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the @@ -102,26 +97,13 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { } public static class ViewOverrideFactory - implements PTransformOverrideFactory< - PCollection, PCollectionView, CreatePCollectionView> { - + extends SingleInputOutputOverrideFactory< + PCollection, PCollectionView, CreatePCollectionView> { @Override public PTransform, PCollectionView> getReplacementTransform( CreatePCollectionView transform) { return new DirectCreatePCollectionView<>(transform); } - - @Override - public PCollection getInput( - List inputs, Pipeline p) { - return (PCollection) Iterables.getOnlyElement(inputs).getValue(); - } - - @Override - public Map mapOutputs( - List outputs, PCollectionView newOutput) { - return ReplacementOutputs.singleton(outputs, newOutput); - } } /** http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java index 03f1dda..c9fdda0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java @@ -32,12 +32,12 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link DirectGBKIntoKeyedWorkItemsOverrideFactory}. - */ +/** Tests for {@link DirectGBKIntoKeyedWorkItemsOverrideFactory}. */ @RunWith(JUnit4.class) public class DirectGroupByKeyOverrideFactoryTest { - private DirectGroupByKeyOverrideFactory factory = new DirectGroupByKeyOverrideFactory(); + private DirectGroupByKeyOverrideFactory factory = + new DirectGroupByKeyOverrideFactory<>(); + @Test public void getInputSucceeds() { TestPipeline p = TestPipeline.create(); http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java index 8f170dd..59577a8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java @@ -33,7 +33,8 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class ParDoSingleViaMultiOverrideFactoryTest { - private ParDoSingleViaMultiOverrideFactory factory = new ParDoSingleViaMultiOverrideFactory(); + private ParDoSingleViaMultiOverrideFactory factory = + new ParDoSingleViaMultiOverrideFactory<>(); @Test public void getInputSucceeds() {