beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/3] beam git commit: Migrate DirectRunner one-to-one factories
Date Wed, 22 Feb 2017 17:36:41 GMT
Repository: beam
Updated Branches:
  refs/heads/master 453e37bc6 -> fbaac0fc8


Migrate DirectRunner one-to-one factories

Use SingleInputOutputOverrideFactory to reduce prevalence of
boilerplate.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a9361fa6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a9361fa6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a9361fa6

Branch: refs/heads/master
Commit: a9361fa6b68d3d6f6333872a32fd5f923f9f9673
Parents: 926385c
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Feb 21 09:55:47 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Wed Feb 22 09:36:17 2017 -0800

----------------------------------------------------------------------
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 23 ++-----------------
 .../direct/DirectGroupByKeyOverrideFactory.java | 22 ++----------------
 .../ParDoSingleViaMultiOverrideFactory.java     | 24 +++-----------------
 .../runners/direct/ViewEvaluatorFactory.java    | 24 +++-----------------
 .../DirectGroupByKeyOverrideFactoryTest.java    |  8 +++----
 .../ParDoSingleViaMultiOverrideFactoryTest.java |  3 ++-
 6 files changed, 16 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index a957a17..bb90a6c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -17,26 +17,19 @@
  */
 package org.apache.beam.runners.direct;
 
-import com.google.common.collect.Iterables;
-import java.util.List;
-import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * Provides an implementation of {@link SplittableParDo.GBKIntoKeyedWorkItems} for the Direct
  * Runner.
  */
 class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
-    implements PTransformOverrideFactory<
+    extends SingleInputOutputOverrideFactory<
         PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT,
InputT>>,
         GBKIntoKeyedWorkItems<KeyT, InputT>> {
   @Override
@@ -44,16 +37,4 @@ class DirectGBKIntoKeyedWorkItemsOverrideFactory<KeyT, InputT>
       getReplacementTransform(GBKIntoKeyedWorkItems<KeyT, InputT> transform) {
     return new DirectGroupByKey.DirectGroupByKeyOnly<>();
   }
-
-  @Override
-  public PCollection<KV<KeyT, InputT>> getInput(
-      List<TaggedPValue> inputs, Pipeline p) {
-    return (PCollection<KV<KeyT, InputT>>) Iterables.getOnlyElement(inputs).getValue();
-  }
-
-  @Override
-  public Map<PValue, ReplacementOutput> mapOutputs(
-      List<TaggedPValue> outputs, PCollection<KeyedWorkItem<KeyT, InputT>>
newOutput) {
-    return ReplacementOutputs.singleton(outputs, newOutput);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index 1651987..f3b718f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -17,38 +17,20 @@
  */
 package org.apache.beam.runners.direct;
 
-import com.google.common.collect.Iterables;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 
 /** A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms. */
 final class DirectGroupByKeyOverrideFactory<K, V>
-    implements PTransformOverrideFactory<
+    extends SingleInputOutputOverrideFactory<
         PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>,
GroupByKey<K, V>> {
   @Override
   public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>
getReplacementTransform(
       GroupByKey<K, V> transform) {
     return new DirectGroupByKey<>(transform);
   }
-
-  @Override
-  public PCollection<KV<K, V>> getInput(
-      List<TaggedPValue> inputs, Pipeline p) {
-    return (PCollection<KV<K, V>>) Iterables.getOnlyElement(inputs).getValue();
-  }
-
-  @Override
-  public Map<PValue, ReplacementOutput> mapOutputs(
-      List<TaggedPValue> outputs, PCollection<KV<K, Iterable<V>>>
newOutput) {
-    return ReplacementOutputs.singleton(outputs, newOutput);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index 990efb3..f859729 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -17,19 +17,13 @@
  */
 package org.apache.beam.runners.direct;
 
-import com.google.common.collect.Iterables;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
-import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.Bound;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
@@ -38,26 +32,14 @@ import org.apache.beam.sdk.values.TupleTagList;
  * it in terms of multi-output {@link ParDo}.
  */
 class ParDoSingleViaMultiOverrideFactory<InputT, OutputT>
-    implements PTransformOverrideFactory<
-        PCollection<? extends InputT>, PCollection<OutputT>, Bound<InputT,
OutputT>> {
+    extends SingleInputOutputOverrideFactory<
+            PCollection<? extends InputT>, PCollection<OutputT>, Bound<InputT,
OutputT>> {
   @Override
   public PTransform<PCollection<? extends InputT>, PCollection<OutputT>>
getReplacementTransform(
       Bound<InputT, OutputT> transform) {
     return new ParDoSingleViaMulti<>(transform);
   }
 
-  @Override
-  public PCollection<? extends InputT> getInput(
-      List<TaggedPValue> inputs, Pipeline p) {
-    return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue();
-  }
-
-  @Override
-  public Map<PValue, ReplacementOutput> mapOutputs(
-      List<TaggedPValue> outputs, PCollection<OutputT> newOutput) {
-    return ReplacementOutputs.singleton(outputs, newOutput);
-  }
-
   static class ParDoSingleViaMulti<InputT, OutputT>
       extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>>
{
     private static final String MAIN_OUTPUT_TAG = "main";

http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 49faaa9..0fa6254 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -20,15 +20,12 @@ package org.apache.beam.runners.direct;
 import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.construction.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;
-import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -38,8 +35,6 @@ import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
@@ -102,26 +97,13 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory {
   }
 
   public static class ViewOverrideFactory<ElemT, ViewT>
-      implements PTransformOverrideFactory<
-          PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT,
ViewT>> {
-
+      extends SingleInputOutputOverrideFactory<
+                PCollection<ElemT>, PCollectionView<ViewT>, CreatePCollectionView<ElemT,
ViewT>> {
     @Override
     public PTransform<PCollection<ElemT>, PCollectionView<ViewT>> getReplacementTransform(
         CreatePCollectionView<ElemT, ViewT> transform) {
       return new DirectCreatePCollectionView<>(transform);
     }
-
-    @Override
-    public PCollection<ElemT> getInput(
-        List<TaggedPValue> inputs, Pipeline p) {
-      return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue();
-    }
-
-    @Override
-    public Map<PValue, ReplacementOutput> mapOutputs(
-        List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) {
-      return ReplacementOutputs.singleton(outputs, newOutput);
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java
index 03f1dda..c9fdda0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactoryTest.java
@@ -32,12 +32,12 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests for {@link DirectGBKIntoKeyedWorkItemsOverrideFactory}.
- */
+/** Tests for {@link DirectGBKIntoKeyedWorkItemsOverrideFactory}. */
 @RunWith(JUnit4.class)
 public class DirectGroupByKeyOverrideFactoryTest {
-  private DirectGroupByKeyOverrideFactory factory = new DirectGroupByKeyOverrideFactory();
+  private DirectGroupByKeyOverrideFactory<String, Integer> factory =
+      new DirectGroupByKeyOverrideFactory<>();
+
   @Test
   public void getInputSucceeds() {
     TestPipeline p = TestPipeline.create();

http://git-wip-us.apache.org/repos/asf/beam/blob/a9361fa6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java
index 8f170dd..59577a8 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactoryTest.java
@@ -33,7 +33,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class ParDoSingleViaMultiOverrideFactoryTest {
-  private ParDoSingleViaMultiOverrideFactory factory = new ParDoSingleViaMultiOverrideFactory();
+  private ParDoSingleViaMultiOverrideFactory<Integer, Integer> factory =
+      new ParDoSingleViaMultiOverrideFactory<>();
 
   @Test
   public void getInputSucceeds() {


Mime
View raw message