beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/2] incubator-beam git commit: [BEAM-1149] Explode windows when fn uses side inputs
Date Tue, 13 Dec 2016 23:06:55 GMT
[BEAM-1149] Explode windows when fn uses side inputs


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a90c4285
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a90c4285
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a90c4285

Branch: refs/heads/master
Commit: a90c4285053821d0015f56be52d81bd18994e405
Parents: dad5ba5
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Tue Dec 13 14:35:33 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue Dec 13 15:06:46 2016 -0800

----------------------------------------------------------------------
 .../core/PushbackSideInputDoFnRunner.java       | 23 +++-------
 .../core/PushbackSideInputDoFnRunnerTest.java   | 16 +++----
 .../apache/beam/sdk/transforms/ParDoTest.java   | 45 ++++++++++++++++++++
 3 files changed, 60 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a90c4285/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 460154d..0bb9153 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -71,32 +71,23 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements
DoFnRunner<
    */
   public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT>
elem) {
     if (views.isEmpty()) {
+      // When there are no side inputs, we can preserve the compressed representation.
       processElement(elem);
       return Collections.emptyList();
     }
-    ImmutableList.Builder<BoundedWindow> readyWindowsBuilder = ImmutableList.builder();
-    ImmutableList.Builder<BoundedWindow> pushedBackWindowsBuilder = ImmutableList.builder();
+    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
     for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
       BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
       if (isReady(mainInputWindow)) {
-        readyWindowsBuilder.add(mainInputWindow);
+        // When there are any side inputs, we have to process the element in each window
+        // individually, to disambiguate access to per-window side inputs.
+        processElement(windowElem);
       } else {
         notReadyWindows.add(mainInputWindow);
-        pushedBackWindowsBuilder.add(mainInputWindow);
+        pushedBack.add(windowElem);
       }
     }
-    ImmutableList<BoundedWindow> readyWindows = readyWindowsBuilder.build();
-    ImmutableList<BoundedWindow> pushedBackWindows = pushedBackWindowsBuilder.build();
-    if (!readyWindows.isEmpty()) {
-      processElement(
-          WindowedValue.of(
-              elem.getValue(), elem.getTimestamp(), readyWindows, elem.getPane()));
-    }
-    return pushedBackWindows.isEmpty()
-        ? ImmutableList.<WindowedValue<InputT>>of()
-        : ImmutableList.of(
-            WindowedValue.of(
-                elem.getValue(), elem.getTimestamp(), pushedBackWindows, elem.getPane()));
+    return pushedBack.build();
   }
 
   private boolean isReady(BoundedWindow mainInputWindow) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a90c4285/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index f8f4604..176ab26 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.core;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -130,7 +130,7 @@ public class PushbackSideInputDoFnRunnerTest {
             PaneInfo.ON_TIME_AND_ONLY_FIRING);
     Iterable<WindowedValue<Integer>> multiWindowPushback =
         runner.processElementInReadyWindows(multiWindow);
-    assertThat(multiWindowPushback, contains(multiWindow));
+    assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
     assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable());
   }
 
@@ -165,10 +165,8 @@ public class PushbackSideInputDoFnRunnerTest {
         underlying.inputElems,
         containsInAnyOrder(
             WindowedValue.of(
-                2,
-                new Instant(-2),
-                ImmutableList.of(littleWindow, bigWindow),
-                PaneInfo.NO_FIRING)));
+                2, new Instant(-2), ImmutableList.of(littleWindow), PaneInfo.NO_FIRING),
+            WindowedValue.of(2, new Instant(-2), ImmutableList.of(bigWindow), PaneInfo.NO_FIRING)));
   }
 
   @Test
@@ -191,8 +189,9 @@ public class PushbackSideInputDoFnRunnerTest {
     Iterable<WindowedValue<Integer>> multiWindowPushback =
         runner.processElementInReadyWindows(multiWindow);
     assertThat(multiWindowPushback, emptyIterable());
-    assertThat(underlying.inputElems,
-        containsInAnyOrder(ImmutableList.of(multiWindow).toArray()));
+    assertThat(
+        underlying.inputElems,
+        containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
   }
 
   @Test
@@ -212,6 +211,7 @@ public class PushbackSideInputDoFnRunnerTest {
     Iterable<WindowedValue<Integer>> multiWindowPushback =
         runner.processElementInReadyWindows(multiWindow);
     assertThat(multiWindowPushback, emptyIterable());
+    // Should preserve the compressed representation when there's no side inputs.
     assertThat(underlying.inputElems, containsInAnyOrder(multiWindow));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a90c4285/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 2d118e4..4a3e2dd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
@@ -88,6 +89,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.joda.time.MutableDateTime;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -724,6 +726,49 @@ public class ParDoTest implements Serializable {
     pipeline.run();
   }
 
+  private static class FnWithSideInputs extends DoFn<String, String> {
+    private final PCollectionView<Integer> view;
+
+    private FnWithSideInputs(PCollectionView<Integer> view) {
+      this.view = view;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element() + ":" + c.sideInput(view));
+    }
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testSideInputsWithMultipleWindows() {
+    // Tests that the runner can safely run a DoFn that uses side inputs
+    // on an input where the element is in multiple windows. The complication is
+    // that side inputs are per-window, so the runner has to make sure
+    // to process each window individually.
+    Pipeline p = TestPipeline.create();
+
+    MutableDateTime mutableNow = Instant.now().toMutableDateTime();
+    mutableNow.setMillisOfSecond(0);
+    Instant now = mutableNow.toInstant();
+
+    SlidingWindows windowFn =
+        SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
+    PCollectionView<Integer> view = p.apply(Create.of(1)).apply(View.<Integer>asSingleton());
+    PCollection<String> res =
+        p.apply(Create.timestamped(TimestampedValue.of("a", now)))
+            .apply(Window.<String>into(windowFn))
+            .apply(ParDo.of(new FnWithSideInputs(view)).withSideInputs(view));
+
+    for (int i = 0; i < 4; ++i) {
+      Instant base = now.minus(Duration.standardSeconds(i));
+      IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5)));
+      PAssert.that(res).inWindow(window).containsInAnyOrder("a:1");
+    }
+
+    p.run();
+  }
+
   @Test
   @Category(NeedsRunner.class)
   public void testParDoWithErrorInStartBatch() {


Mime
View raw message