beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/3] incubator-beam git commit: Implement InProcessEvaluationContext
Date Tue, 15 Mar 2016 18:23:56 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 7f9270ee2 -> 46412e5f2


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java
index 4cfe782..16b4eb7 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessSideInputContainerTest.java
@@ -24,7 +24,6 @@ import static org.mockito.Mockito.doAnswer;
 
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.Mean;
@@ -137,7 +136,7 @@ public class InProcessSideInputContainerTest {
     container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
 
     Map<String, Integer> viewContents =
-        container.withViews(ImmutableList.<PCollectionView<?>>of(mapView))
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
             .get(mapView, firstWindow);
     assertThat(viewContents, hasEntry("one", 1));
     assertThat(viewContents, hasEntry("two", 2));
@@ -153,7 +152,7 @@ public class InProcessSideInputContainerTest {
     container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two));
 
     Map<String, Integer> viewContents =
-        container.withViews(ImmutableList.<PCollectionView<?>>of(mapView))
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
             .get(mapView, secondWindow);
     assertThat(viewContents, hasEntry("one", 1));
     assertThat(viewContents, hasEntry("two", 2));
@@ -164,7 +163,7 @@ public class InProcessSideInputContainerTest {
     container.write(mapView, ImmutableList.<WindowedValue<?>>of(three));
 
     Map<String, Integer> overwrittenViewContents =
-        container.withViews(ImmutableList.<PCollectionView<?>>of(mapView))
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
             .get(mapView, secondWindow);
     assertThat(overwrittenViewContents, hasEntry("three", 3));
     assertThat(overwrittenViewContents.size(), is(1));
@@ -176,15 +175,18 @@ public class InProcessSideInputContainerTest {
    */
   @Test
   public void getBlocksUntilPaneAvailable() throws Exception {
-    BoundedWindow window = new BoundedWindow() {
-      @Override
-      public Instant maxTimestamp() {
-        return new Instant(1024L);
-      }
-    };
+    BoundedWindow window =
+        new BoundedWindow() {
+          @Override
+          public Instant maxTimestamp() {
+            return new Instant(1024L);
+          }
+        };
     Future<Double> singletonFuture =
-        getFutureOfView(container.withViews(ImmutableList.<PCollectionView<?>>of(singletonView)),
-            singletonView, window);
+        getFutureOfView(
+            container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)),
+            singletonView,
+            window);
 
     WindowedValue<Double> singletonValue =
         WindowedValue.of(4.75, new Instant(475L), window, PaneInfo.ON_TIME_AND_ONLY_FIRING);
@@ -203,7 +205,7 @@ public class InProcessSideInputContainerTest {
       }
     };
     SideInputReader newReader =
-        container.withViews(ImmutableList.<PCollectionView<?>>of(singletonView));
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView));
     Future<Double> singletonFuture = getFutureOfView(newReader, singletonView, window);
 
     WindowedValue<Double> singletonValue =
@@ -216,25 +218,31 @@ public class InProcessSideInputContainerTest {
 
   @Test
   public void withPCollectionViewsErrorsForContainsNotInViews() {
-    PCollectionView<Map<String, Iterable<String>>> newView = PCollectionViews.multimapView(pipeline,
-        WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+    PCollectionView<Map<String, Iterable<String>>> newView =
+        PCollectionViews.multimapView(
+            pipeline,
+            WindowingStrategy.globalDefault(),
+            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("with unknown views " + ImmutableList.of(newView).toString());
 
-    container.withViews(ImmutableList.<PCollectionView<?>>of(newView));
+    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
   }
 
   @Test
   public void withViewsForViewNotInContainerFails() {
-    PCollectionView<Map<String, Iterable<String>>> newView = PCollectionViews.multimapView(pipeline,
-        WindowingStrategy.globalDefault(), KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
+    PCollectionView<Map<String, Iterable<String>>> newView =
+        PCollectionViews.multimapView(
+            pipeline,
+            WindowingStrategy.globalDefault(),
+            KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("unknown views");
     thrown.expectMessage(newView.toString());
 
-    container.withViews(ImmutableList.<PCollectionView<?>>of(newView));
+    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(newView));
   }
 
   @Test
@@ -242,7 +250,7 @@ public class InProcessSideInputContainerTest {
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("unknown view: " + iterableView.toString());
 
-    container.withViews(ImmutableList.<PCollectionView<?>>of(mapView))
+    container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
         .get(iterableView, GlobalWindow.INSTANCE);
   }
 
@@ -255,11 +263,11 @@ public class InProcessSideInputContainerTest {
             PaneInfo.ON_TIME_AND_ONLY_FIRING);
     container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue));
     assertThat(
-        container.withViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
             .get(singletonView, firstWindow),
         equalTo(2.875));
     assertThat(
-        container.withViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
             .get(singletonView, secondWindow),
         equalTo(4.125));
   }
@@ -274,7 +282,7 @@ public class InProcessSideInputContainerTest {
     container.write(iterableView, ImmutableList.of(firstValue, secondValue));
 
     assertThat(
-        container.withViews(ImmutableList.<PCollectionView<?>>of(iterableView))
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
             .get(iterableView, firstWindow),
         contains(44, 44));
   }
@@ -286,11 +294,11 @@ public class InProcessSideInputContainerTest {
             ImmutableList.of(firstWindow, secondWindow), PaneInfo.ON_TIME_AND_ONLY_FIRING);
     container.write(singletonView, ImmutableList.of(multiWindowedValue));
     assertThat(
-        container.withViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
             .get(singletonView, firstWindow),
         equalTo(2.875));
     assertThat(
-        container.withViews(ImmutableList.<PCollectionView<?>>of(singletonView))
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
             .get(singletonView, secondWindow),
         equalTo(2.875));
   }
@@ -306,7 +314,7 @@ public class InProcessSideInputContainerTest {
     immediatelyInvokeCallback(mapView, secondWindow);
 
     Map<String, Integer> viewContents =
-        container.withViews(ImmutableList.<PCollectionView<?>>of(mapView))
+        container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView))
             .get(mapView, secondWindow);
 
     assertThat(viewContents, hasEntry("one", 1));
@@ -317,8 +325,11 @@ public class InProcessSideInputContainerTest {
   @Test
   public void finishOnPendingViewsSetsEmptyElements() throws Exception {
     immediatelyInvokeCallback(mapView, secondWindow);
-    Future<Map<String, Integer>> mapFuture = getFutureOfView(
-        container.withViews(ImmutableList.<PCollectionView<?>>of(mapView)), mapView,
secondWindow);
+    Future<Map<String, Integer>> mapFuture =
+        getFutureOfView(
+            container.createReaderForViews(ImmutableList.<PCollectionView<?>>of(mapView)),
+            mapView,
+            secondWindow);
 
     assertThat(mapFuture.get().isEmpty(), is(true));
   }
@@ -329,18 +340,21 @@ public class InProcessSideInputContainerTest {
    */
   private void immediatelyInvokeCallback(PCollectionView<?> view, BoundedWindow window)
{
     doAnswer(
-        new Answer<Void>() {
-          @Override
-          public Void answer(InvocationOnMock invocation) throws Throwable {
-            Object callback = invocation.getArguments()[3];
-            Runnable callbackRunnable = (Runnable) callback;
-            callbackRunnable.run();
-            return null;
-          }
-        })
+            new Answer<Void>() {
+              @Override
+              public Void answer(InvocationOnMock invocation) throws Throwable {
+                Object callback = invocation.getArguments()[3];
+                Runnable callbackRunnable = (Runnable) callback;
+                callbackRunnable.run();
+                return null;
+              }
+            })
         .when(context)
-        .callAfterOutputMustHaveBeenProduced(Mockito.eq(view), Mockito.eq(window),
-            Mockito.eq(view.getWindowingStrategyInternal()), Mockito.any(Runnable.class));
+        .scheduleAfterOutputWouldBeProduced(
+            Mockito.eq(view),
+            Mockito.eq(window),
+            Mockito.eq(view.getWindowingStrategyInternal()),
+            Mockito.any(Runnable.class));
   }
 
   private <ValueT> Future<ValueT> getFutureOfView(final SideInputReader myReader,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
index 033f9de..66430b6 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoMultiEvaluatorFactoryTest.java
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
index ae599ba..3b928b9 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ParDoSingleEvaluatorFactoryTest.java
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
index f139c56..a9bbcc8 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.when;
 import com.google.cloud.dataflow.sdk.io.CountingSource;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
index 2f5cd0f..2f5bdde 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ViewEvaluatorFactoryTest.java
@@ -25,7 +25,6 @@ import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
 import com.google.cloud.dataflow.sdk.coders.VoidCoder;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.InProcessEvaluationContext;
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/53db1597/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java
new file mode 100644
index 0000000..be3e062
--- /dev/null
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WatermarkCallbackExecutorTest.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.Sum;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link WatermarkCallbackExecutor}.
+ */
+@RunWith(JUnit4.class)
+public class WatermarkCallbackExecutorTest {
+  private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create();
+  private AppliedPTransform<?, ?, ?> create;
+  private AppliedPTransform<?, ?, ?> sum;
+
+  @Before
+  public void setup() {
+    TestPipeline p = TestPipeline.create();
+    PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+    create = created.getProducingTransformInternal();
+    sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal();
+  }
+
+  @Test
+  public void onGuaranteedFiringFiresAfterTrigger() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    executor.callOnGuaranteedFiring(
+        create,
+        GlobalWindow.INSTANCE,
+        WindowingStrategy.globalDefault(),
+        new CountDownLatchCallback(latch));
+
+    executor.fireForWatermark(create, BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true));
+  }
+
+  @Test
+  public void multipleCallbacksShouldFireFires() throws Exception {
+    CountDownLatch latch = new CountDownLatch(2);
+    WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10)));
+    executor.callOnGuaranteedFiring(
+        create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+    executor.callOnGuaranteedFiring(
+        create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+
+    executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(10)));
+    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true));
+  }
+
+  @Test
+  public void noCallbacksShouldFire() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10)));
+    executor.callOnGuaranteedFiring(
+        create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+
+    executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(5)));
+    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false));
+  }
+
+  @Test
+  public void unrelatedStepShouldNotFire() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10)));
+    executor.callOnGuaranteedFiring(
+        sum, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+
+    executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(20)));
+    assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false));
+  }
+
+  private static class CountDownLatchCallback implements Runnable {
+    private final CountDownLatch latch;
+
+    public CountDownLatchCallback(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+      latch.countDown();
+    }
+  }
+}


Mime
View raw message