beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamesmal...@apache.org
Subject [33/50] [abbrv] incubator-beam git commit: Handle multiple requests in InProcess Read Primitives
Date Fri, 26 Feb 2016 22:55:10 GMT
Handle multiple requests in InProcess Read Primitives

Each source that is invoked by a read should produce its elements in
serial. Using a queue of available sources enforces only one worker
having access to a source at a time.

Add EmptyTransformEvaluator, to be returned in the case that there are
no unused sources. EmptyTransformEvaluator ignores all input, produces
no output, and cannot advance the watermark.

----Release Notes----
[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=115578920


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3eb30924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3eb30924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3eb30924

Branch: refs/heads/master
Commit: 3eb309242047f08afb105cc8a4e0c05f72131fec
Parents: 6c71040
Author: tgroh <tgroh@google.com>
Authored: Thu Feb 25 10:44:41 2016 -0800
Committer: Davor Bonaci <davorbonaci@users.noreply.github.com>
Committed: Thu Feb 25 23:58:28 2016 -0800

----------------------------------------------------------------------
 .../inprocess/BoundedReadEvaluatorFactory.java  | 61 ++++++++++----
 .../inprocess/EmptyTransformEvaluator.java      | 49 ++++++++++++
 .../UnboundedReadEvaluatorFactory.java          | 75 +++++++++++++-----
 .../BoundedReadEvaluatorFactoryTest.java        | 75 ++++++++++++++----
 .../UnboundedReadEvaluatorFactoryTest.java      | 83 +++++++++++++++-----
 5 files changed, 273 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
index d11187c..1c02798 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactory.java
@@ -27,8 +27,10 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 
 import java.io.IOException;
-import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.annotation.Nullable;
 
@@ -42,34 +44,62 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
    * Evaluators are cached here to ensure that the reader is not restarted if the evaluator
is
    * retriggered.
    */
-  private final Map<EvaluatorKey, BoundedReadEvaluator<?>> sourceEvaluators =
-      new ConcurrentHashMap<>();
+  private final ConcurrentMap<EvaluatorKey, Queue<? extends BoundedReadEvaluator<?>>>
+      sourceEvaluators = new ConcurrentHashMap<>();
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
   public <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application,
       @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
+      InProcessEvaluationContext evaluationContext)
+      throws IOException {
     return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
   }
 
   private <OutputT> TransformEvaluator<?> getTransformEvaluator(
       final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>>
transform,
-      final InProcessEvaluationContext evaluationContext) {
+      final InProcessEvaluationContext evaluationContext)
+      throws IOException {
+    BoundedReadEvaluator<?> evaluator =
+        getTransformEvaluatorQueue(transform, evaluationContext).poll();
+    if (evaluator == null) {
+      return EmptyTransformEvaluator.create(transform);
+    }
+    return evaluator;
+  }
+
+  /**
+   * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements
for the
+   * provided application of {@link Bounded Read.Bounded}, initializing it if required.
+   *
+   * <p>This method is thread-safe, and will only produce new evaluators if no other
invocation has
+   * already done so.
+   */
+  @SuppressWarnings("unchecked")
+  private <OutputT> Queue<BoundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
+      final AppliedPTransform<?, PCollection<OutputT>, Bounded<OutputT>>
transform,
+      final InProcessEvaluationContext evaluationContext)
+      throws IOException {
+    // Key by the application and the context the evaluation is occurring in (which call
to
+    // Pipeline#run).
     EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
-    @SuppressWarnings("unchecked")
-    BoundedReadEvaluator<OutputT> result =
-        (BoundedReadEvaluator<OutputT>) sourceEvaluators.get(key);
-    if (result == null) {
-      try {
-        result = new BoundedReadEvaluator<OutputT>(transform, evaluationContext);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
+    Queue<BoundedReadEvaluator<OutputT>> evaluatorQueue =
+        (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+    if (evaluatorQueue == null) {
+      evaluatorQueue = new ConcurrentLinkedQueue<>();
+      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
+        // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
+        // factory for this transform
+        BoundedReadEvaluator<OutputT> evaluator =
+            new BoundedReadEvaluator<OutputT>(transform, evaluationContext);
+        evaluatorQueue.offer(evaluator);
+      } else {
+        // otherwise return the existing Queue that arrived before us
+        evaluatorQueue = (Queue<BoundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
       }
-      sourceEvaluators.put(key, result);
     }
-    return result;
+    return evaluatorQueue;
   }
 
   private static class BoundedReadEvaluator<OutputT> implements TransformEvaluator<Object>
{
@@ -108,4 +138,3 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
new file mode 100644
index 0000000..fc09237
--- /dev/null
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/EmptyTransformEvaluator.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2016 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+/**
+ * A {@link TransformEvaluator} that ignores all input and produces no output. The result
of
+ * invoking {@link #finishBundle()} on this evaluator is to return an
+ * {@link InProcessTransformResult} with no elements and a timestamp hold equal to
+ * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}. Because the result contains no elements, this
hold
+ * will not affect the watermark.
+ */
+final class EmptyTransformEvaluator<T> implements TransformEvaluator<T> {
+  public static <T> TransformEvaluator<T> create(AppliedPTransform<?, ?, ?>
transform) {
+    return new EmptyTransformEvaluator<T>(transform);
+  }
+
+  private final AppliedPTransform<?, ?, ?> transform;
+
+  private EmptyTransformEvaluator(AppliedPTransform<?, ?, ?> transform) {
+    this.transform = transform;
+  }
+
+  @Override
+  public void processElement(WindowedValue<T> element) throws Exception {}
+
+  @Override
+  public InProcessTransformResult finishBundle() throws Exception {
+    return StepTransformResult.withHold(transform, BoundedWindow.TIMESTAMP_MIN_VALUE)
+        .build();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
index 1852cee..4beac33 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactory.java
@@ -29,8 +29,10 @@ import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
 
 import javax.annotation.Nullable;
 
@@ -44,42 +46,74 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
    * Evaluators are cached here to ensure that the checkpoint mark is appropriately reused
    * and any splits are honored.
    */
-  private final Map<EvaluatorKey, UnboundedReadEvaluator<?>> sourceEvaluators
= new HashMap<>();
+  private final ConcurrentMap<EvaluatorKey, Queue<? extends UnboundedReadEvaluator<?>>>
+      sourceEvaluators = new ConcurrentHashMap<>();
 
   @SuppressWarnings({"unchecked", "rawtypes"})
   @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      @Nullable CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
+  public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?,
?, ?> application,
+      @Nullable CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext)
{
     return getTransformEvaluator((AppliedPTransform) application, evaluationContext);
   }
 
   private <OutputT> TransformEvaluator<?> getTransformEvaluator(
       final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>>
transform,
       final InProcessEvaluationContext evaluationContext) {
+    UnboundedReadEvaluator<?> currentEvaluator =
+        getTransformEvaluatorQueue(transform, evaluationContext).poll();
+    if (currentEvaluator == null) {
+      return EmptyTransformEvaluator.create(transform);
+    }
+    return currentEvaluator;
+  }
+
+  /**
+   * Get the queue of {@link TransformEvaluator TransformEvaluators} that produce elements
for the
+   * provided application of {@link Unbounded Read.Unbounded}, initializing it if required.
+   *
+   * <p>This method is thread-safe, and will only produce new evaluators if no other
invocation has
+   * already done so.
+   */
+  @SuppressWarnings("unchecked")
+  private <OutputT> Queue<UnboundedReadEvaluator<OutputT>> getTransformEvaluatorQueue(
+      final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>>
transform,
+      final InProcessEvaluationContext evaluationContext) {
+    // Key by the application and the context the evaluation is occurring in (which call
to
+    // Pipeline#run).
     EvaluatorKey key = new EvaluatorKey(transform, evaluationContext);
     @SuppressWarnings("unchecked")
-    UnboundedReadEvaluator<OutputT> result =
-        (UnboundedReadEvaluator<OutputT>) sourceEvaluators.get(key);
-    if (result == null) {
-      result = new UnboundedReadEvaluator<OutputT>(transform, evaluationContext);
-      sourceEvaluators.put(key, result);
+    Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue =
+        (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+    if (evaluatorQueue == null) {
+      evaluatorQueue = new ConcurrentLinkedQueue<>();
+      if (sourceEvaluators.putIfAbsent(key, evaluatorQueue) == null) {
+        // If no queue existed in the evaluators, add an evaluator to initialize the evaluator
+        // factory for this transform
+        UnboundedReadEvaluator<OutputT> evaluator =
+            new UnboundedReadEvaluator<OutputT>(transform, evaluationContext, evaluatorQueue);
+        evaluatorQueue.offer(evaluator);
+      } else {
+        // otherwise return the existing Queue that arrived before us
+        evaluatorQueue = (Queue<UnboundedReadEvaluator<OutputT>>) sourceEvaluators.get(key);
+      }
     }
-    return result;
+    return evaluatorQueue;
   }
 
   private static class UnboundedReadEvaluator<OutputT> implements TransformEvaluator<Object>
{
     private static final int ARBITRARY_MAX_ELEMENTS = 10;
     private final AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>>
transform;
     private final InProcessEvaluationContext evaluationContext;
+    private final Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue;
     private CheckpointMark checkpointMark;
 
     public UnboundedReadEvaluator(
         AppliedPTransform<?, PCollection<OutputT>, Unbounded<OutputT>>
transform,
-        InProcessEvaluationContext evaluationContext) {
+        InProcessEvaluationContext evaluationContext,
+        Queue<UnboundedReadEvaluator<OutputT>> evaluatorQueue) {
       this.transform = transform;
       this.evaluationContext = evaluationContext;
+      this.evaluatorQueue = evaluatorQueue;
       this.checkpointMark = null;
     }
 
@@ -103,11 +137,14 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
       }
       checkpointMark = reader.getCheckpointMark();
       checkpointMark.finalizeCheckpoint();
-      // TODO: When exercising create initial splits, make this the minimum across all existing
-      // readers
-      return StepTransformResult.withHold(transform, reader.getWatermark())
-          .addOutput(output)
-          .build();
+      // TODO: When exercising create initial splits, make this the minimum watermark across
all
+      // existing readers
+      StepTransformResult result =
+          StepTransformResult.withHold(transform, reader.getWatermark())
+              .addOutput(output)
+              .build();
+      evaluatorQueue.offer(this);
+      return result;
     }
 
     private <CheckpointMarkT extends CheckpointMark> UnboundedReader<OutputT>
createReader(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
index 0a4c4a1..e17926d 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/BoundedReadEvaluatorFactoryTest.java
@@ -33,6 +33,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -42,14 +43,23 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class BoundedReadEvaluatorFactoryTest {
-  @Test
-  public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception
{
-    BoundedSource<Long> source = CountingSource.upTo(10L);
+  private BoundedSource<Long> source;
+  private PCollection<Long> longs;
+  private TransformEvaluatorFactory factory;
+  private InProcessEvaluationContext context;
+
+  @Before
+  public void setup() {
+    source = CountingSource.upTo(10L);
     TestPipeline p = TestPipeline.create();
-    PCollection<Long> longs = p.apply(Read.from(source));
+    longs = p.apply(Read.from(source));
+
+    factory = new BoundedReadEvaluatorFactory();
+    context = mock(InProcessEvaluationContext.class);
+  }
 
-    TransformEvaluatorFactory factory = new BoundedReadEvaluatorFactory();
-    InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
+  @Test
+  public void boundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception
{
     UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
     when(context.createRootBundle(longs)).thenReturn(output);
 
@@ -63,14 +73,13 @@ public class BoundedReadEvaluatorFactoryTest {
             gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
   }
 
+  /**
+   * Demonstrate that acquiring multiple {@link TransformEvaluator TransformEvaluators} for
the same
+   * {@link Bounded Read.Bounded} application with the same evaluation context only produces
the
+   * elements once.
+   */
   @Test
-  public void boundedSourceInMemoryTransformEvaluatorMultipleCalls() throws Exception {
-    BoundedSource<Long> source = CountingSource.upTo(10L);
-    TestPipeline p = TestPipeline.create();
-    PCollection<Long> longs = p.apply(Read.from(source));
-
-    TransformEvaluatorFactory factory = new BoundedReadEvaluatorFactory();
-    InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
+  public void boundedSourceInMemoryTransformEvaluatorAfterFinishIsEmpty() throws Exception
{
     UncommittedBundle<Long> output =
         InProcessBundle.unkeyed(longs);
     when(context.createRootBundle(longs)).thenReturn(output);
@@ -91,7 +100,45 @@ public class BoundedReadEvaluatorFactoryTest {
     TransformEvaluator<?> secondEvaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
     InProcessTransformResult secondResult = secondEvaluator.finishBundle();
-    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+    assertThat(secondResult.getOutputBundles(), emptyIterable());
+    assertThat(
+        secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable());
+    assertThat(
+        outputElements,
+        containsInAnyOrder(
+            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
+  }
+
+  /**
+   * Demonstrates that acquiring multiple evaluators from the factory are independent, but
+   * the elements in the source are only produced once.
+   */
+  @Test
+  public void boundedSourceEvaluatorSimultaneousEvaluations() throws Exception {
+    UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+    UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs);
+    when(context.createRootBundle(longs)).thenReturn(output).thenReturn(secondOutput);
+
+    // create both evaluators before finishing either.
+    TransformEvaluator<?> evaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), null, context);
+    TransformEvaluator<?> secondEvaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+    InProcessTransformResult secondResult = secondEvaluator.finishBundle();
+
+    InProcessTransformResult result = evaluator.finishBundle();
+    assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    Iterable<? extends WindowedValue<Long>> outputElements =
+        output.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements();
+
+    assertThat(
+        outputElements,
+        containsInAnyOrder(
+            gw(1L), gw(2L), gw(4L), gw(8L), gw(9L), gw(7L), gw(6L), gw(5L), gw(3L), gw(0L)));
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+    assertThat(secondResult.getOutputBundles(), emptyIterable());
     assertThat(
         secondOutput.commit(BoundedWindow.TIMESTAMP_MAX_VALUE).getElements(), emptyIterable());
     assertThat(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3eb30924/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
index 28f2db5..8640056 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/UnboundedReadEvaluatorFactoryTest.java
@@ -16,6 +16,8 @@
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -28,6 +30,7 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.U
 import com.google.cloud.dataflow.sdk.runners.inprocess.util.InProcessBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -36,6 +39,7 @@ import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
 import org.joda.time.Instant;
 import org.joda.time.ReadableInstant;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -44,50 +48,56 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class UnboundedReadEvaluatorFactoryTest {
-  @Test
-  public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception
{
+  private PCollection<Long> longs;
+  private TransformEvaluatorFactory factory;
+  private InProcessEvaluationContext context;
+  private UncommittedBundle<Long> output;
+
+  @Before
+  public void setup() {
     UnboundedSource<Long, ?> source =
         CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
     TestPipeline p = TestPipeline.create();
-    PCollection<Long> longs = p.apply(Read.from(source));
+    longs = p.apply(Read.from(source));
 
-    TransformEvaluatorFactory factory = new UnboundedReadEvaluatorFactory();
-    InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
+    factory = new UnboundedReadEvaluatorFactory();
+    context = mock(InProcessEvaluationContext.class);
+    output = InProcessBundle.unkeyed(longs);
     when(context.createRootBundle(longs)).thenReturn(output);
+  }
 
+  @Test
+  public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception
{
     TransformEvaluator<?> evaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
     InProcessTransformResult result = evaluator.finishBundle();
     assertThat(
         result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
     assertThat(
         output.commit(Instant.now()).getElements(),
-        containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L),
tgw(5L),
-            tgw(3L), tgw(0L)));
+        containsInAnyOrder(
+            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
+            tgw(0L)));
   }
 
+  /**
+   * Demonstrate that multiple sequential creations will produce additional elements if the
source
+   * can provide them.
+   */
   @Test
-  public void unboundedSourceInMemoryTransformEvaluatorMultipleCalls() throws Exception {
-    UnboundedSource<Long, ?> source =
-        CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
-    TestPipeline p = TestPipeline.create();
-    PCollection<Long> longs = p.apply(Read.from(source));
-
-    TransformEvaluatorFactory factory = new UnboundedReadEvaluatorFactory();
-    InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
-    UncommittedBundle<Long> output = InProcessBundle.unkeyed(longs);
-    when(context.createRootBundle(longs)).thenReturn(output);
-
+  public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception
{
     TransformEvaluator<?> evaluator =
         factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
     InProcessTransformResult result = evaluator.finishBundle();
     assertThat(
         result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
     assertThat(
         output.commit(Instant.now()).getElements(),
-        containsInAnyOrder(tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L),
tgw(5L),
-            tgw(3L), tgw(0L)));
+        containsInAnyOrder(
+            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
+            tgw(0L)));
 
     UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs);
     when(context.createRootBundle(longs)).thenReturn(secondOutput);
@@ -103,6 +113,37 @@ public class UnboundedReadEvaluatorFactoryTest {
             tgw(15L), tgw(13L), tgw(10L)));
   }
 
+  // TODO: Once the source is split into multiple sources before evaluating, this test will
have to
+  // be updated.
+  /**
+   * Demonstrate that only a single unfinished instance of TransformEvaluator can be created
at a
+   * time, with other calls returning an empty evaluator.
+   */
+  @Test
+  public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception
{
+    UncommittedBundle<Long> secondOutput = InProcessBundle.unkeyed(longs);
+
+    TransformEvaluator<?> evaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+    TransformEvaluator<?> secondEvaluator =
+        factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+    InProcessTransformResult secondResult = secondEvaluator.finishBundle();
+    InProcessTransformResult result = evaluator.finishBundle();
+
+    assertThat(
+        result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+    assertThat(
+        output.commit(Instant.now()).getElements(),
+        containsInAnyOrder(
+            tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
+            tgw(0L)));
+
+    assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+    assertThat(secondOutput.commit(Instant.now()).getElements(), emptyIterable());
+  }
+
   /**
    * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where
    * the timestamp is the epoch offset by the value of the element.


Mime
View raw message