beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamesmal...@apache.org
Subject [19/50] [abbrv] incubator-beam git commit: Handle Undeclared Side Outputs in ParDoInProcessEvaluator
Date Fri, 26 Feb 2016 22:54:56 GMT
Handle Undeclared Side Outputs in ParDoInProcessEvaluator

The value of an Undeclared Side Output is ignored by the
InProcessPipelineRunner.

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115489641


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/045e3436
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/045e3436
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/045e3436

Branch: refs/heads/master
Commit: 045e3436e6b2ce9593c1f8ebeaa57ea7d229134e
Parents: 1cc0211
Author: tgroh <tgroh@google.com>
Authored: Wed Feb 24 13:56:45 2016 -0800
Committer: Davor Bonaci <davorbonaci@users.noreply.github.com>
Committed: Thu Feb 25 23:58:27 2016 -0800

----------------------------------------------------------------------
 .../inprocess/ParDoInProcessEvaluator.java      | 17 ++++-
 .../ParDoMultiEvaluatorFactoryTest.java         | 74 ++++++++++++++++++++
 .../ParDoSingleEvaluatorFactoryTest.java        | 42 +++++++++++
 3 files changed, 131 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045e3436/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
index a2b083b..f0b2ca2 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoInProcessEvaluator.java
@@ -27,7 +27,10 @@ import com.google.cloud.dataflow.sdk.values.TupleTag;
 
 import org.joda.time.Instant;
 
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 class ParDoInProcessEvaluator<T> {
@@ -61,6 +64,7 @@ class ParDoInProcessEvaluator<T> {
 
   static class BundleOutputManager implements OutputManager {
     private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
+    private final Map<TupleTag<?>, List<?>> undeclaredOutputs;
 
     public static BundleOutputManager create(Map<TupleTag<?>, UncommittedBundle<?>>
outputBundles) {
       return new BundleOutputManager(outputBundles);
@@ -68,6 +72,7 @@ class ParDoInProcessEvaluator<T> {
 
     private BundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>>
bundles) {
       this.bundles = bundles;
+      undeclaredOutputs = new HashMap<>();
     }
 
     @SuppressWarnings("unchecked")
@@ -75,8 +80,16 @@ class ParDoInProcessEvaluator<T> {
     public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
       @SuppressWarnings("rawtypes")
       UncommittedBundle bundle = bundles.get(tag);
-      bundle.add(output);
+      if (bundle == null) {
+        List undeclaredContents = undeclaredOutputs.get(tag);
+        if (undeclaredContents == null) {
+          undeclaredContents = new ArrayList<T>();
+          undeclaredOutputs.put(tag, undeclaredContents);
+        }
+        undeclaredContents.add(output);
+      } else {
+        bundle.add(output);
+      }
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045e3436/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
index 5251a76..c55a9d5 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
@@ -137,5 +137,79 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
             WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
             WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
   }
+
+  @Test
+  public void testParDoMultiUndeclaredSideOutput() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+    TupleTag<KV<String, Integer>> mainOutputTag = new TupleTag<KV<String,
Integer>>() {};
+    final TupleTag<String> elementTag = new TupleTag<>();
+    final TupleTag<Integer> lengthTag = new TupleTag<>();
+
+    BoundMulti<String, KV<String, Integer>> pardo =
+        ParDo.of(new DoFn<String, KV<String, Integer>>() {
+          @Override
+          public void processElement(ProcessContext c) {
+            c.output(KV.<String, Integer>of(c.element(), c.element().length()));
+            c.sideOutput(elementTag, c.element());
+            c.sideOutput(lengthTag, c.element().length());
+          }
+        }).withOutputTags(mainOutputTag, TupleTagList.of(elementTag));
+    PCollectionTuple outputTuple = input.apply(pardo);
+
+    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+
+    PCollection<KV<String, Integer>> mainOutput = outputTuple.get(mainOutputTag);
+    PCollection<String> elementOutput = outputTuple.get(elementTag);
+
+    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    UncommittedBundle<KV<String, Integer>> mainOutputBundle = InProcessBundle.unkeyed(mainOutput);
+    UncommittedBundle<String> elementOutputBundle = InProcessBundle.unkeyed(elementOutput);
+
+    when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+    when(evaluationContext.createBundle(inputBundle, elementOutput))
+        .thenReturn(elementOutputBundle);
+
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext();
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal()))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator<String> evaluator
=
+        new ParDoMultiEvaluatorFactory().forApplication(
+            mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(),
+        Matchers.<UncommittedBundle<?>>containsInAnyOrder(
+            mainOutputBundle, elementOutputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getCounters(), equalTo(counters));
+
+    assertThat(
+        mainOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<KV<String, Integer>>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow(KV.of("foo", 3)),
+            WindowedValue.timestampedValueInGlobalWindow(KV.of("bara", 4), new Instant(1000)),
+            WindowedValue.valueInGlobalWindow(
+                KV.of("bazam", 5), PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(
+        elementOutputBundle.commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<String>>containsInAnyOrder(
+            WindowedValue.valueInGlobalWindow("foo"),
+            WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)),
+            WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/045e3436/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
index c2e148b..4fc765c 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
@@ -34,6 +34,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
 
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
@@ -92,5 +93,46 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
             WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
             WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
   }
+
+  @Test
+  public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception {
+    TestPipeline p = TestPipeline.create();
+
+    PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+    final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
+    PCollection<Integer> collection = input.apply(ParDo.of(new DoFn<String, Integer>()
{
+      @Override public void processElement(ProcessContext c) {
+        c.sideOutput(sideOutputTag, c.element().length());
+      }
+    }));
+    CommittedBundle<String> inputBundle = InProcessBundle.unkeyed(input).commit(Instant.now());
+
+    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+    UncommittedBundle<Integer> outputBundle =
+        InProcessBundle.unkeyed(collection);
+    when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
+    InProcessExecutionContext executionContext =
+        new InProcessExecutionContext();
+    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal()))
+        .thenReturn(executionContext);
+    CounterSet counters = new CounterSet();
+    when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+    TransformEvaluator<String> evaluator =
+        new ParDoSingleEvaluatorFactory().forApplication(
+            collection.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+    evaluator.processElement(
+        WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+    evaluator.processElement(
+        WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(
+        result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
+    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(result.getCounters(), equalTo(counters));
+  }
 }
 


Mime
View raw message