beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/3] incubator-beam git commit: Use PushbackDoFnRunner in the ParDoInProcessEvaluator
Date Tue, 10 May 2016 17:28:21 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 7c917a6ee -> 487052588


Use PushbackDoFnRunner in the ParDoInProcessEvaluator

This ensures that the evaluator does not block while processing an input
bundle.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dd4ef6ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dd4ef6ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dd4ef6ff

Branch: refs/heads/master
Commit: dd4ef6ffc67d4776d115bd6a77483c6f2fd66ae5
Parents: d3b96bc
Author: Thomas Groh <tgroh@google.com>
Authored: Wed Apr 27 17:27:57 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue May 10 10:15:14 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/ParDoInProcessEvaluator.java |  24 ++-
 .../direct/ParDoInProcessEvaluatorTest.java     | 214 +++++++++++++++++++
 .../sdk/util/PushbackSideInputDoFnRunner.java   |   2 +-
 .../sdk/util/IdentitySideInputWindowFn.java     |   2 +-
 4 files changed, 235 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd4ef6ff/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
index 1c51738..2cdf6cb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
@@ -25,6 +25,8 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.util.DoFnRunner;
 import org.apache.beam.sdk.util.DoFnRunners;
 import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.PushbackSideInputDoFnRunner;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -34,6 +36,8 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
+import com.google.common.collect.ImmutableList;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -65,17 +69,21 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T>
{
           evaluationContext.createBundle(inputBundle, outputEntry.getValue()));
     }
 
-    DoFnRunner<InputT, OutputT> runner =
+    ReadyCheckingSideInputReader sideInputReader =
+        evaluationContext.createSideInputReader(sideInputs);
+    DoFnRunner<InputT, OutputT> underlying =
         DoFnRunners.createDefault(
             evaluationContext.getPipelineOptions(),
             SerializableUtils.clone(fn),
-            evaluationContext.createSideInputReader(sideInputs),
+            sideInputReader,
             BundleOutputManager.create(outputBundles),
             mainOutputTag,
             sideOutputTags,
             stepContext,
             counters.getAddCounterMutator(),
             application.getInput().getWindowingStrategy());
+    PushbackSideInputDoFnRunner<InputT, OutputT> runner =
+        PushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader);
 
     try {
       runner.startBundle();
@@ -89,14 +97,16 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T>
{
 
   ////////////////////////////////////////////////////////////////////////////////////////////////
 
-  private final DoFnRunner<T, ?> fnRunner;
+  private final PushbackSideInputDoFnRunner<T, ?> fnRunner;
   private final AppliedPTransform<PCollection<T>, ?, ?> transform;
   private final CounterSet counters;
   private final Collection<UncommittedBundle<?>> outputBundles;
   private final InProcessStepContext stepContext;
 
+  private final ImmutableList.Builder<WindowedValue<T>> unprocessedElements;
+
   private ParDoInProcessEvaluator(
-      DoFnRunner<T, ?> fnRunner,
+      PushbackSideInputDoFnRunner<T, ?> fnRunner,
       AppliedPTransform<PCollection<T>, ?, ?> transform,
       CounterSet counters,
       Collection<UncommittedBundle<?>> outputBundles,
@@ -106,12 +116,15 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T>
{
     this.counters = counters;
     this.outputBundles = outputBundles;
     this.stepContext = stepContext;
+
+    this.unprocessedElements = ImmutableList.builder();
   }
 
   @Override
   public void processElement(WindowedValue<T> element) {
     try {
-      fnRunner.processElement(element);
+      Iterable<WindowedValue<T>> unprocessed = fnRunner.processElementInReadyWindows(element);
+      unprocessedElements.addAll(unprocessed);
     } catch (Exception e) {
       throw UserCodeException.wrap(e);
     }
@@ -137,6 +150,7 @@ class ParDoInProcessEvaluator<T> implements TransformEvaluator<T>
{
         .addOutput(outputBundles)
         .withTimerUpdate(stepContext.getTimerUpdate())
         .withCounters(counters)
+        .addUnprocessedElements(unprocessedElements.build())
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd4ef6ff/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
new file mode 100644
index 0000000..ca15d9c
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * Tests for {@link ParDoInProcessEvaluator}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoInProcessEvaluatorTest {
+  @Mock private InProcessEvaluationContext evaluationContext;
+  private PCollection<Integer> inputPc;
+  private TupleTag<Integer> mainOutputTag;
+  private List<TupleTag<?>> sideOutputTags;
+  private BundleFactory bundleFactory;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    TestPipeline p = TestPipeline.create();
+    inputPc = p.apply(Create.of(1, 2, 3));
+    mainOutputTag = new TupleTag<Integer>() {};
+    sideOutputTags = TupleTagList.empty().getAll();
+
+    bundleFactory = InProcessBundleFactory.create();
+  }
+
+  @Test
+  public void sideInputsNotReadyResultHasUnprocessedElements() {
+    PCollectionView<Integer> singletonView =
+        inputPc
+            .apply(Window.into(new IdentitySideInputWindowFn()))
+            .apply(View.<Integer>asSingleton().withDefaultValue(0));
+    RecorderFn fn = new RecorderFn(singletonView);
+    PCollection<Integer> output = inputPc.apply(ParDo.of(fn).withSideInputs(singletonView));
+
+    CommittedBundle<Integer> inputBundle =
+        bundleFactory.createRootBundle(inputPc).commit(Instant.now());
+    UncommittedBundle<Integer> outputBundle = bundleFactory.createBundle(inputBundle,
output);
+    when(evaluationContext.createBundle(inputBundle, output))
+        .thenReturn(outputBundle);
+
+    ParDoInProcessEvaluator<Integer> evaluator =
+        createEvaluator(singletonView, fn, inputBundle, output);
+
+    IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
+    WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3);
+    WindowedValue<Integer> second =
+        WindowedValue.of(2, new Instant(1234L), nonGlobalWindow, PaneInfo.NO_FIRING);
+    WindowedValue<Integer> third =
+        WindowedValue.of(
+            1,
+            new Instant(2468L),
+            ImmutableList.of(nonGlobalWindow, GlobalWindow.INSTANCE),
+            PaneInfo.NO_FIRING);
+
+    evaluator.processElement(first);
+    evaluator.processElement(second);
+    evaluator.processElement(third);
+    InProcessTransformResult result = evaluator.finishBundle();
+
+    assertThat(
+        result.getUnprocessedElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(
+            second, WindowedValue.of(1, new Instant(2468L), nonGlobalWindow, PaneInfo.NO_FIRING)));
+    assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
+    assertThat(RecorderFn.processed, containsInAnyOrder(1, 3));
+    assertThat(
+        Iterables.getOnlyElement(result.getOutputBundles()).commit(Instant.now()).getElements(),
+        Matchers.<WindowedValue<?>>containsInAnyOrder(
+            first.withValue(8),
+            WindowedValue.timestampedValueInGlobalWindow(6, new Instant(2468L))));
+  }
+
+  private ParDoInProcessEvaluator<Integer> createEvaluator(
+      PCollectionView<Integer> singletonView,
+      RecorderFn fn,
+      InProcessPipelineRunner.CommittedBundle<Integer> inputBundle,
+      PCollection<Integer> output) {
+    when(
+            evaluationContext.createSideInputReader(
+                ImmutableList.<PCollectionView<?>>of(singletonView)))
+        .thenReturn(new ReadyInGlobalWindowReader());
+    InProcessExecutionContext executionContext = mock(InProcessExecutionContext.class);
+    InProcessStepContext stepContext = mock(InProcessStepContext.class);
+    when(
+            executionContext.getOrCreateStepContext(
+                Mockito.any(String.class), Mockito.any(String.class)))
+        .thenReturn(stepContext);
+    when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());
+    when(
+            evaluationContext.getExecutionContext(
+                Mockito.any(AppliedPTransform.class), Mockito.any(Object.class)))
+        .thenReturn(executionContext);
+    when(evaluationContext.createCounterSet()).thenReturn(new CounterSet());
+
+    return ParDoInProcessEvaluator.create(
+        evaluationContext,
+        inputBundle,
+        (AppliedPTransform<PCollection<Integer>, ?, ?>) output.getProducingTransformInternal(),
+        fn,
+        ImmutableList.<PCollectionView<?>>of(singletonView),
+        mainOutputTag,
+        sideOutputTags,
+        ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output));
+  }
+
+  private static class RecorderFn extends DoFn<Integer, Integer> {
+    private static Collection<Integer> processed;
+    private final PCollectionView<Integer> view;
+
+    public RecorderFn(PCollectionView<Integer> view) {
+      processed = new ArrayList<>();
+      this.view = view;
+    }
+
+    @Override
+    public void processElement(DoFn<Integer, Integer>.ProcessContext c) throws Exception
{
+      processed.add(c.element());
+      c.output(c.element() + c.sideInput(view));
+    }
+  }
+
+  private static class ReadyInGlobalWindowReader implements ReadyCheckingSideInputReader
{
+    @Override
+    @Nullable
+    public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+      if (window.equals(GlobalWindow.INSTANCE)) {
+        return (T) (Integer) 5;
+      }
+      fail("Should only call get in the Global Window, others are not ready");
+      throw new AssertionError("Unreachable");
+    }
+
+    @Override
+    public <T> boolean contains(PCollectionView<T> view) {
+      return true;
+    }
+
+    @Override
+    public boolean isEmpty() {
+      return false;
+    }
+
+    @Override
+    public boolean isReady(PCollectionView<?> view, BoundedWindow window) {
+      return window.equals(GlobalWindow.INSTANCE);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd4ef6ff/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
index 4eeedf6..b1442dd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java
@@ -32,7 +32,7 @@ import java.util.Set;
  * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
  * them via the {@link #processElementInReadyWindows(WindowedValue)}.
  */
-class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
OutputT> {
+public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
OutputT> {
   private final DoFnRunner<InputT, OutputT> underlying;
   private final Collection<PCollectionView<?>> views;
   private final ReadyCheckingSideInputReader sideInputReader;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dd4ef6ff/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
index ecab6f8..db6f425 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -29,7 +29,7 @@ import java.util.Collection;
  * A {@link WindowFn} for use during tests that returns the input window for calls to
  * {@link #getSideInputWindow(BoundedWindow)}.
  */
-class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, BoundedWindow>
{
+public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, BoundedWindow>
{
   @Override
   public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext
c)
       throws Exception {


Mime
View raw message