beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-777] KafkaIO Test should handle reader.start() better.
Date Wed, 19 Oct 2016 07:20:04 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master dde8e35ca -> ea04e618e


[BEAM-777] KafkaIO Test should handle reader.start() better.

KafkaIOTest : start() can return false


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2c8ade83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2c8ade83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2c8ade83

Branch: refs/heads/master
Commit: 2c8ade83b2104ecd7f8098b18dd45a0fd8b6cc9f
Parents: dde8e35
Author: Raghu Angadi <rangadi@google.com>
Authored: Tue Oct 18 21:46:18 2016 -0700
Committer: Sela <ansela@paypal.com>
Committed: Wed Oct 19 10:06:45 2016 +0300

----------------------------------------------------------------------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 26 ++++++++------------
 1 file changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2c8ade83/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 67aa675..2f3c524 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -389,7 +389,10 @@ public class KafkaIOTest {
   // Kafka records are read in a separate thread inside the reader. As a result advance()
might not
   // read any records even from the mock consumer, especially for the first record.
   // This is a helper method to loop until we read a record.
-  private static void advanceOnce(UnboundedReader<?> reader) throws IOException {
+  private static void advanceOnce(UnboundedReader<?> reader, boolean isStarted) throws
IOException {
+    if (!isStarted && reader.start()) {
+      return;
+    }
     while (!reader.advance()) {
       // very rarely will there be more than one attempts.
       // In case of a bug we might end up looping forever, and test will fail with a timeout.
@@ -418,9 +421,8 @@ public class KafkaIOTest {
     final int numToSkip = 20; // one from each partition.
 
     // advance numToSkip elements
-    reader.start();
-    for (int l = 1; l < numToSkip; ++l) {
-      advanceOnce(reader);
+    for (int i = 0; i < numToSkip; ++i) {
+      advanceOnce(reader, i > 0);
     }
 
     // Confirm that we get the expected element in sequence before checkpointing.
@@ -435,13 +437,10 @@ public class KafkaIOTest {
     // Confirm that we get the next elements in sequence.
     // This also confirms that Reader interleaves records from each partitions by the reader.
 
-    reader.start();
     for (int i = numToSkip; i < numElements; i++) {
+      advanceOnce(reader, i > numToSkip);
       assertEquals(i, (long) reader.getCurrent().getKV().getValue());
       assertEquals(i, reader.getCurrentTimestamp().getMillis());
-      if ((i + 1) < numElements) {
-        advanceOnce(reader);
-      }
     }
   }
 
@@ -460,9 +459,8 @@ public class KafkaIOTest {
 
     UnboundedReader<KafkaRecord<Integer, Long>> reader = source.createReader(null,
null);
 
-    reader.start();
-    for (int l = 1; l < initialNumElements; ++l) {
-      advanceOnce(reader);
+    for (int l = 0; l < initialNumElements; ++l) {
+      advanceOnce(reader, l > 0);
     }
 
     // Checkpoint and restart, and confirm that the source continues correctly.
@@ -490,19 +488,15 @@ public class KafkaIOTest {
 
     reader = source.createReader(null, mark);
 
-    reader.start();
-
     // Verify in any order. As the partitions are unevenly read, the returned records are
not in a
     // simple order. Note that testUnboundedSourceCheckpointMark() verifies round-robin oder.
 
     List<Long> expected = new ArrayList<>();
     List<Long> actual = new ArrayList<>();
     for (long i = initialNumElements; i < numElements; i++) {
+      advanceOnce(reader, i > initialNumElements);
       expected.add(i);
       actual.add(reader.getCurrent().getKV().getValue());
-      if ((i + 1) < numElements) {
-        advanceOnce(reader);
-      }
     }
     assertThat(actual, IsIterableContainingInAnyOrder.containsInAnyOrder(expected.toArray()));
   }


Mime
View raw message