beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [4/5] incubator-beam git commit: [BEAM-619] extend test case to be parameterized
Date Fri, 09 Sep 2016 14:12:30 GMT
[BEAM-619] extend test case to be parameterized

- extend test case with number of tasks and splits parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4afd25a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4afd25a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4afd25a7

Branch: refs/heads/master
Commit: 4afd25a7a85a24ff0212a4791661d3c5e105662b
Parents: 145ad47
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Sep 7 14:23:12 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri Sep 9 16:09:44 2016 +0200

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    |   8 ++
 .../streaming/UnboundedSourceWrapperTest.java   | 113 +++++++------------
 2 files changed, 50 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 2cd06ed..a62a754 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -397,4 +397,12 @@ public class UnboundedSourceWrapper<
   public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources()
{
     return splitSources;
   }
+
+  /**
+   * Visible so that we can check this in tests. Must not be used for anything else.
+   */
+  @VisibleForTesting
+  public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources()
{
+    return localSplitSources;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4afd25a7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 73124a9..0cc584e 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
@@ -44,78 +46,43 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Tests for {@link UnboundedSourceWrapper}.
  */
+@RunWith(Parameterized.class)
 public class UnboundedSourceWrapperTest {
 
-  /**
-   * Creates a {@link UnboundedSourceWrapper} that has exactly one reader per source, since
we
-   * specify a parallelism of 1 and also at runtime tell the source that it has 1 parallel
subtask.
-   */
-  @Test
-  public void testWithOneReader() throws Exception {
-    final int numElements = 20;
-    final Object checkpointLock = new Object();
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    // this source will emit exactly NUM_ELEMENTS across all parallel readers,
-    // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
-    // elements later.
-    TestCountingSource source = new TestCountingSource(numElements);
-    UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>
flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, 1);
-
-    assertEquals(1, flinkWrapper.getSplitSources().size());
-
-    StreamSource<
-        WindowedValue<KV<Integer, Integer>>,
-        UnboundedSourceWrapper<
-            KV<Integer, Integer>,
-            TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
-    setupSourceOperator(sourceOperator);
-
-
-    try {
-      sourceOperator.run(checkpointLock,
-          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>()
{
-            private int count = 0;
+  private final int numTasks;
+  private final int numSplits;
 
-            @Override
-            public void emitWatermark(Watermark watermark) {
-            }
-
-            @Override
-            public void collect(
-                StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord)
{
-
-              count++;
-              if (count >= numElements) {
-                throw new SuccessException();
-              }
-            }
-
-            @Override
-            public void close() {
+  public UnboundedSourceWrapperTest(int numTasks, int numSplits) {
+    this.numTasks = numTasks;
+    this.numSplits = numSplits;
+  }
 
-            }
-          });
-    } catch (SuccessException e) {
-      // success
-    } catch (Exception e) {
-      fail("We caught " + e);
-    }
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    /*
+     * Parameters for initializing the tests:
+     * {numTasks, numSplits}
+     * The test currently assumes powers of two for some assertions.
+     */
+    return Arrays.asList(new Object[][] {
+      {1, 1}, {1, 2}, {1, 4},
+      {2, 1}, {2, 2}, {2, 4},
+      {4, 1}, {4, 2}, {4, 4}
+    });
   }
 
   /**
-   * Creates a {@link UnboundedSourceWrapper} that has multiple readers per source, since
we
-   * specify a parallelism higher than 1 and at runtime tell the source that it has 1 parallel
-   * this means that one source will manage multiple readers.
+   * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
+   * If numSplits > numTasks the source has one source will manage multiple readers.
    */
   @Test
-  public void testWithMultipleReaders() throws Exception {
+  public void testReaders() throws Exception {
     final int numElements = 20;
     final Object checkpointLock = new Object();
     PipelineOptions options = PipelineOptionsFactory.create();
@@ -125,9 +92,9 @@ public class UnboundedSourceWrapperTest {
     // elements later.
     TestCountingSource source = new TestCountingSource(numElements);
     UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>
flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, 4);
+        new UnboundedSourceWrapper<>(options, source, numSplits);
 
-    assertEquals(4, flinkWrapper.getSplitSources().size());
+    assertEquals(numSplits, flinkWrapper.getSplitSources().size());
 
     StreamSource<WindowedValue<
         KV<Integer, Integer>>,
@@ -135,8 +102,7 @@ public class UnboundedSourceWrapperTest {
             KV<Integer, Integer>,
             TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
 
-    setupSourceOperator(sourceOperator);
-
+    setupSourceOperator(sourceOperator, numTasks);
 
     try {
       sourceOperator.run(checkpointLock,
@@ -163,6 +129,9 @@ public class UnboundedSourceWrapperTest {
             }
           });
     } catch (SuccessException e) {
+
+      assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+
       // success
       return;
     }
@@ -186,9 +155,9 @@ public class UnboundedSourceWrapperTest {
     // elements later.
     TestCountingSource source = new TestCountingSource(numElements);
     UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark>
flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, 1);
+        new UnboundedSourceWrapper<>(options, source, numSplits);
 
-    assertEquals(1, flinkWrapper.getSplitSources().size());
+    assertEquals(numSplits, flinkWrapper.getSplitSources().size());
 
     StreamSource<
         WindowedValue<KV<Integer, Integer>>,
@@ -196,7 +165,7 @@ public class UnboundedSourceWrapperTest {
             KV<Integer, Integer>,
             TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
 
-    setupSourceOperator(sourceOperator);
+    setupSourceOperator(sourceOperator, numTasks);
 
     final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
 
@@ -241,9 +210,9 @@ public class UnboundedSourceWrapperTest {
     TestCountingSource restoredSource = new TestCountingSource(numElements);
     UnboundedSourceWrapper<
         KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper
=
-        new UnboundedSourceWrapper<>(options, restoredSource, 1);
+        new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
 
-    assertEquals(1, restoredFlinkWrapper.getSplitSources().size());
+    assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
 
     StreamSource<
         WindowedValue<KV<Integer, Integer>>,
@@ -252,7 +221,7 @@ public class UnboundedSourceWrapperTest {
             TestCountingSource.CounterMark>> restoredSourceOperator =
         new StreamSource<>(restoredFlinkWrapper);
 
-    setupSourceOperator(restoredSourceOperator);
+    setupSourceOperator(restoredSourceOperator, numTasks);
 
     // restore snapshot
     restoredFlinkWrapper.restoreState(snapshot);
@@ -289,6 +258,8 @@ public class UnboundedSourceWrapperTest {
       readSecondBatchOfElements = true;
     }
 
+    assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+
     assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
 
     // verify that we saw all NUM_ELEMENTS elements
@@ -296,13 +267,13 @@ public class UnboundedSourceWrapperTest {
   }
 
   @SuppressWarnings("unchecked")
-  private static <T> void setupSourceOperator(StreamSource<T, ?> operator) {
+  private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int
numSubTasks) {
     ExecutionConfig executionConfig = new ExecutionConfig();
     StreamConfig cfg = new StreamConfig(new Configuration());
 
     cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
-    Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
+    Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
 
     StreamTask<?, ?> mockTask = mock(StreamTask.class);
     when(mockTask.getName()).thenReturn("Mock Task");


Mime
View raw message