beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Allow InProcess Evaluators to check Side Input completion
Date Wed, 27 Apr 2016 20:41:57 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6914f2a1d -> a9387fcaa


Allow InProcess Evaluators to check Side Input completion

This checks to ensure that the PCollectionView in the SideInputWindow
for the provided window either has elements available or is empty.

Schedule a future to ensure that the SideInputWindows are appropriately
filled with an empty iterable after retreiving the element.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7ccfd6f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7ccfd6f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7ccfd6f1

Branch: refs/heads/master
Commit: 7ccfd6f109edaec4d4f718884eaa3db86ae5b001
Parents: fdec569
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Apr 18 16:16:48 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Apr 27 13:41:28 2016 -0700

----------------------------------------------------------------------
 .../inprocess/InProcessEvaluationContext.java   |  23 +-
 .../inprocess/InProcessSideInputContainer.java  |  90 +++++--
 .../InProcessSideInputContainerTest.java        | 267 ++++++++++++++-----
 3 files changed, 277 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ccfd6f1/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
index d439ba7..3990f0d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessEvaluationContext.java
@@ -320,18 +320,31 @@ class InProcessEvaluationContext {
   }
 
   /**
-   * Returns a {@link SideInputReader} capable of reading the provided
+   * Returns a {@link ReadyCheckingSideInputReader} capable of reading the provided
    * {@link PCollectionView PCollectionViews}.
+   *
    * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be
able to
-   *                   read
-   * @return a {@link SideInputReader} that can read all of the provided
-   *         {@link PCollectionView PCollectionViews}
+   * read
+   * @return a {@link SideInputReader} that can read all of the provided {@link PCollectionView
+   * PCollectionViews}
    */
-  public SideInputReader createSideInputReader(final List<PCollectionView<?>>
sideInputs) {
+  public ReadyCheckingSideInputReader createSideInputReader(
+      final List<PCollectionView<?>> sideInputs) {
     return sideInputContainer.createReaderForViews(sideInputs);
   }
 
   /**
+   * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView}
has
+   * had its contents set in a window.
+   */
+  static interface ReadyCheckingSideInputReader extends SideInputReader {
+    /**
+     * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
+     */
+    boolean isReady(PCollectionView<?> view, BoundedWindow window);
+  }
+
+  /**
    * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
    * of all other {@link CounterSet CounterSets} created by this call.
    *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ccfd6f1/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java
index 6bf6e8a..fda78fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainer.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.runners.inprocess;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.PCollectionViewWindow;
@@ -44,6 +45,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import javax.annotation.Nullable;
 
@@ -88,11 +90,12 @@ class InProcessSideInputContainer {
   }
 
   /**
-   * Return a view of this {@link InProcessSideInputContainer} that contains only the views
in
-   * the provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable
without
+   * Return a view of this {@link InProcessSideInputContainer} that contains only the views
in the
+   * provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable
without
    * casting, but will change as this {@link InProcessSideInputContainer} is modified.
    */
-  public SideInputReader createReaderForViews(Collection<PCollectionView<?>>
newContainedViews) {
+  public ReadyCheckingSideInputReader createReaderForViews(
+      Collection<PCollectionView<?>> newContainedViews) {
     if (!containedViews.containsAll(newContainedViews)) {
       Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
       Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
@@ -173,7 +176,7 @@ class InProcessSideInputContainer {
     }
   }
 
-  private final class SideInputContainerSideInputReader implements SideInputReader {
+  private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader
{
     private final Collection<PCollectionView<?>> readerViews;
 
     private SideInputContainerSideInputReader(Collection<PCollectionView<?>>
readerViews) {
@@ -181,33 +184,23 @@ class InProcessSideInputContainer {
     }
 
     @Override
+    public boolean isReady(final PCollectionView<?> view, final BoundedWindow window)
{
+      checkArgument(
+          readerViews.contains(view),
+          "Tried to check if view %s was ready in a SideInputReader that does not contain
it. "
+              + "Contained views; %s",
+          view,
+          readerViews);
+      return getViewFuture(view, window).isDone();
+    }
+
+    @Override
     @Nullable
     public <T> T get(final PCollectionView<T> view, final BoundedWindow window)
{
       checkArgument(
           readerViews.contains(view), "calling get(PCollectionView) with unknown view: "
+ view);
-      PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
       try {
-        final SettableFuture<Iterable<? extends WindowedValue<?>>> future
=
-            viewByWindows.get(windowedView);
-
-        WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
-        evaluationContext.scheduleAfterOutputWouldBeProduced(
-            view, window, windowingStrategy, new Runnable() {
-              @Override
-              public void run() {
-                // The requested window has closed without producing elements, so reflect
that in
-                // the PCollectionView. If set has already been called, will do nothing.
-                future.set(Collections.<WindowedValue<?>>emptyList());
-          }
-
-          @Override
-          public String toString() {
-            return MoreObjects.toStringHelper("InProcessSideInputContainerEmptyCallback")
-                .add("view", view)
-                .add("window", window)
-                .toString();
-          }
-        });
+        final Future<Iterable<? extends WindowedValue<?>>> future = getViewFuture(view,
window);
         // Safe covariant cast
         @SuppressWarnings("unchecked")
         Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>)
future.get();
@@ -220,6 +213,23 @@ class InProcessSideInputContainer {
       }
     }
 
+    /**
+     * Gets the future containing the contents of the provided {@link PCollectionView} in
the
+     * provided {@link BoundedWindow}, setting up a callback to populate the future with
empty
+     * contents if necessary.
+     */
+    private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture(
+        final PCollectionView<T> view, final BoundedWindow window)  {
+      PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
+      final SettableFuture<Iterable<? extends WindowedValue<?>>> future
=
+          viewByWindows.getUnchecked(windowedView);
+
+      WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
+      evaluationContext.scheduleAfterOutputWouldBeProduced(
+          view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future));
+      return future;
+    }
+
     @Override
     public <T> boolean contains(PCollectionView<T> view) {
       return readerViews.contains(view);
@@ -230,4 +240,32 @@ class InProcessSideInputContainer {
       return readerViews.isEmpty();
     }
   }
+
+  private static class WriteEmptyViewContents implements Runnable {
+    private final PCollectionView<?> view;
+    private final BoundedWindow window;
+    private final SettableFuture<Iterable<? extends WindowedValue<?>>>
future;
+
+    private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
+        SettableFuture<Iterable<? extends WindowedValue<?>>> future) {
+      this.future = future;
+      this.view = view;
+      this.window = window;
+    }
+
+    @Override
+    public void run() {
+      // The requested window has closed without producing elements, so reflect that in
+      // the PCollectionView. If set has already been called, will do nothing.
+      future.set(Collections.<WindowedValue<?>>emptyList());
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("view", view)
+          .add("window", window)
+          .toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7ccfd6f1/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java
index 9922413..03443f8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/inprocess/InProcessSideInputContainerTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.doAnswer;
 
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.runners.inprocess.InProcessEvaluationContext.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Mean;
@@ -68,6 +69,32 @@ import java.util.concurrent.Future;
  */
 @RunWith(JUnit4.class)
 public class InProcessSideInputContainerTest {
+  private static final BoundedWindow FIRST_WINDOW =
+      new BoundedWindow() {
+        @Override
+        public Instant maxTimestamp() {
+          return new Instant(789541L);
+        }
+
+        @Override
+        public String toString() {
+          return "firstWindow";
+        }
+      };
+
+  private static final BoundedWindow SECOND_WINDOW =
+      new BoundedWindow() {
+        @Override
+        public Instant maxTimestamp() {
+          return new Instant(14564786L);
+        }
+
+        @Override
+        public String toString() {
+          return "secondWindow";
+        }
+      };
+
   @Rule
   public ExpectedException thrown = ExpectedException.none();
 
@@ -84,30 +111,6 @@ public class InProcessSideInputContainerTest {
   // Not present in container.
   private PCollectionView<Iterable<Integer>> iterableView;
 
-  private BoundedWindow firstWindow = new BoundedWindow() {
-    @Override
-    public Instant maxTimestamp() {
-      return new Instant(789541L);
-    }
-
-    @Override
-    public String toString() {
-      return "firstWindow";
-    }
-  };
-
-  private BoundedWindow secondWindow = new BoundedWindow() {
-    @Override
-    public Instant maxTimestamp() {
-      return new Instant(14564786L);
-    }
-
-    @Override
-    public String toString() {
-      return "secondWindow";
-    }
-  };
-
   @Before
   public void setup() {
     MockitoAnnotations.initMocks(this);
@@ -120,10 +123,7 @@ public class InProcessSideInputContainerTest {
         create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo"))
             .apply("asMapView", View.<String, Integer>asMap());
 
-    singletonView =
-        create.apply("forCombinedTypes", Mean.<Integer>globally())
-            .apply("asDoubleView", View.<Double>asSingleton());
-
+    singletonView = create.apply("forCombinedTypes", Mean.<Integer>globally().asSingletonView());
     iterableView = create.apply("asIterableView", View.<Integer>asIterable());
 
     container = InProcessSideInputContainer.create(
@@ -132,15 +132,18 @@ public class InProcessSideInputContainerTest {
 
   @Test
   public void getAfterWriteReturnsPaneInWindow() throws Exception {
-    WindowedValue<KV<String, Integer>> one = WindowedValue.of(
-        KV.of("one", 1), new Instant(1L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<KV<String, Integer>> two = WindowedValue.of(
-        KV.of("two", 2), new Instant(20L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<KV<String, Integer>> one =
+        WindowedValue.of(
+            KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<KV<String, Integer>> two =
+        WindowedValue.of(
+            KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING);
     container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
 
     Map<String, Integer> viewContents =
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
-            .get(mapView, firstWindow);
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, FIRST_WINDOW);
     assertThat(viewContents, hasEntry("one", 1));
     assertThat(viewContents, hasEntry("two", 2));
     assertThat(viewContents.size(), is(2));
@@ -148,26 +151,40 @@ public class InProcessSideInputContainerTest {
 
   @Test
   public void getReturnsLatestPaneInWindow() throws Exception {
-    WindowedValue<KV<String, Integer>> one = WindowedValue.of(KV.of("one", 1),
new Instant(1L),
-        secondWindow, PaneInfo.createPane(true, false, Timing.EARLY));
-    WindowedValue<KV<String, Integer>> two = WindowedValue.of(KV.of("two", 2),
new Instant(20L),
-        secondWindow, PaneInfo.createPane(true, false, Timing.EARLY));
+    WindowedValue<KV<String, Integer>> one =
+        WindowedValue.of(
+            KV.of("one", 1),
+            new Instant(1L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    WindowedValue<KV<String, Integer>> two =
+        WindowedValue.of(
+            KV.of("two", 2),
+            new Instant(20L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
     container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
 
     Map<String, Integer> viewContents =
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
-            .get(mapView, secondWindow);
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, SECOND_WINDOW);
     assertThat(viewContents, hasEntry("one", 1));
     assertThat(viewContents, hasEntry("two", 2));
     assertThat(viewContents.size(), is(2));
 
-    WindowedValue<KV<String, Integer>> three = WindowedValue.of(KV.of("three",
3),
-        new Instant(300L), secondWindow, PaneInfo.createPane(false, false, Timing.EARLY,
1, -1));
+    WindowedValue<KV<String, Integer>> three =
+        WindowedValue.of(
+            KV.of("three", 3),
+            new Instant(300L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(false, false, Timing.EARLY, 1, -1));
     container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
 
     Map<String, Integer> overwrittenViewContents =
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
-            .get(mapView, secondWindow);
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, SECOND_WINDOW);
     assertThat(overwrittenViewContents, hasEntry("three", 3));
     assertThat(overwrittenViewContents.size(), is(1));
   }
@@ -259,66 +276,98 @@ public class InProcessSideInputContainerTest {
 
   @Test
   public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception {
-    WindowedValue<Double> firstWindowedValue = WindowedValue.of(2.875,
-        firstWindow.maxTimestamp().minus(200L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<Double> firstWindowedValue =
+        WindowedValue.of(
+            2.875,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            FIRST_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
     WindowedValue<Double> secondWindowedValue =
-        WindowedValue.of(4.125, secondWindow.maxTimestamp().minus(2_000_000L), secondWindow,
+        WindowedValue.of(
+            4.125,
+            SECOND_WINDOW.maxTimestamp().minus(2_000_000L),
+            SECOND_WINDOW,
             PaneInfo.ON_TIME_AND_ONLY_FIRING);
     container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
     assertThat(
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
-            .get(singletonView, firstWindow),
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, FIRST_WINDOW),
         equalTo(2.875));
     assertThat(
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
-            .get(singletonView, secondWindow),
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, SECOND_WINDOW),
         equalTo(4.125));
   }
 
   @Test
   public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
-    WindowedValue<Integer> firstValue = WindowedValue.of(
-        44, firstWindow.maxTimestamp().minus(200L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING);
-    WindowedValue<Integer> secondValue = WindowedValue.of(
-        44, firstWindow.maxTimestamp().minus(200L), firstWindow, PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<Integer> firstValue =
+        WindowedValue.of(
+            44,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            FIRST_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+    WindowedValue<Integer> secondValue =
+        WindowedValue.of(
+            44,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            FIRST_WINDOW,
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
 
     container.write(iterableView, ImmutableList.of(firstValue, secondValue));
 
     assertThat(
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
-            .get(iterableView, firstWindow),
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
+            .get(iterableView, FIRST_WINDOW),
         contains(44, 44));
   }
 
   @Test
   public void writeForElementInMultipleWindowsSucceeds() throws Exception {
     WindowedValue<Double> multiWindowedValue =
-        WindowedValue.of(2.875, firstWindow.maxTimestamp().minus(200L),
-            ImmutableList.of(firstWindow, secondWindow), PaneInfo.ON_TIME_AND_ONLY_FIRING);
+        WindowedValue.of(
+            2.875,
+            FIRST_WINDOW.maxTimestamp().minus(200L),
+            ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
     container.write(singletonView, ImmutableList.of(multiWindowedValue));
     assertThat(
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
-            .get(singletonView, firstWindow),
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, FIRST_WINDOW),
         equalTo(2.875));
     assertThat(
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
-            .get(singletonView, secondWindow),
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+            .get(singletonView, SECOND_WINDOW),
         equalTo(2.875));
   }
 
   @Test
   public void finishDoesNotOverwriteWrittenElements() throws Exception {
-    WindowedValue<KV<String, Integer>> one = WindowedValue.of(KV.of("one", 1),
new Instant(1L),
-        secondWindow, PaneInfo.createPane(true, false, Timing.EARLY));
-    WindowedValue<KV<String, Integer>> two = WindowedValue.of(KV.of("two", 2),
new Instant(20L),
-        secondWindow, PaneInfo.createPane(true, false, Timing.EARLY));
+    WindowedValue<KV<String, Integer>> one =
+        WindowedValue.of(
+            KV.of("one", 1),
+            new Instant(1L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
+    WindowedValue<KV<String, Integer>> two =
+        WindowedValue.of(
+            KV.of("two", 2),
+            new Instant(20L),
+            SECOND_WINDOW,
+            PaneInfo.createPane(true, false, Timing.EARLY));
     container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
 
-    immediatelyInvokeCallback(mapView, secondWindow);
+    immediatelyInvokeCallback(mapView, SECOND_WINDOW);
 
     Map<String, Integer> viewContents =
-        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
-            .get(mapView, secondWindow);
+        container
+            .createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
+            .get(mapView, SECOND_WINDOW);
 
     assertThat(viewContents, hasEntry("one", 1));
     assertThat(viewContents, hasEntry("two", 2));
@@ -327,17 +376,91 @@ public class InProcessSideInputContainerTest {
 
   @Test
   public void finishOnPendingViewsSetsEmptyElements() throws Exception {
-    immediatelyInvokeCallback(mapView, secondWindow);
+    immediatelyInvokeCallback(mapView, SECOND_WINDOW);
     Future<Map<String, Integer>> mapFuture =
         getFutureOfView(
             container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)),
             mapView,
-            secondWindow);
+            SECOND_WINDOW);
 
     assertThat(mapFuture.get().isEmpty(), is(true));
   }
 
   /**
+   * Demonstrates that calling isReady on an empty container throws an
+   * {@link IllegalArgumentException}.
+   */
+  @Test
+  public void isReadyInEmptyReaderThrows() {
+    ReadyCheckingSideInputReader reader =
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of());
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("does not contain");
+    thrown.expectMessage(ImmutableList.of().toString());
+    reader.isReady(mapView, GlobalWindow.INSTANCE);
+  }
+
+  /**
+   * Demonstrates that calling isReady returns false until elements are written to the
+   * {@link PCollectionView}, {@link BoundedWindow} pair, at which point it returns true.
+   */
+  @Test
+  public void isReadyForSomeNotReadyViewsFalseUntilElements() {
+    container.write(
+        mapView,
+        ImmutableList.of(
+            WindowedValue.of(
+                KV.of("one", 1),
+                SECOND_WINDOW.maxTimestamp().minus(100L),
+                SECOND_WINDOW,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+
+    ReadyCheckingSideInputReader reader =
+        container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false));
+    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+
+    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false));
+
+    container.write(
+        mapView,
+        ImmutableList.of(
+            WindowedValue.of(
+                KV.of("too", 2),
+                FIRST_WINDOW.maxTimestamp().minus(100L),
+                FIRST_WINDOW,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(reader.isReady(mapView, FIRST_WINDOW), is(true));
+
+    container.write(
+        singletonView,
+        ImmutableList.of(
+            WindowedValue.of(
+                1.25,
+                SECOND_WINDOW.maxTimestamp().minus(100L),
+                SECOND_WINDOW,
+                PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+    assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true));
+    assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(true));
+
+    assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(false));
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+  }
+
+  @Test
+  public void isReadyForEmptyWindowTrue() {
+    immediatelyInvokeCallback(mapView, GlobalWindow.INSTANCE);
+
+    ReadyCheckingSideInputReader reader =
+        container.createReaderForViews(ImmutableList.of(mapView, singletonView));
+    assertThat(reader.isReady(mapView, GlobalWindow.INSTANCE), is(true));
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(false));
+
+    immediatelyInvokeCallback(singletonView, GlobalWindow.INSTANCE);
+    assertThat(reader.isReady(singletonView, GlobalWindow.INSTANCE), is(true));
+  }
+
+  /**
    * When a callAfterWindowCloses with the specified view's producing transform, window,
and
    * windowing strategy is invoked, immediately execute the callback.
    */


Mime
View raw message