beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] incubator-beam git commit: Revert "Update Watermarks even if a Reader is empty"
Date Tue, 08 Nov 2016 02:05:15 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 339dee954 -> 99505e125


Revert "Update Watermarks even if a Reader is empty"

This reverts commit ff7fe07be96de393b763e7b3d213734040aa3795.

Updated test appears to be broken.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2856fbf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2856fbf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2856fbf

Branch: refs/heads/master
Commit: e2856fbf076d34b7856391eafdfcfeb71bc6d7b2
Parents: 339dee9
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Nov 7 18:02:44 2016 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Nov 7 18:02:44 2016 -0800

----------------------------------------------------------------------
 .../direct/UnboundedReadEvaluatorFactory.java    |  6 ++----
 .../UnboundedReadEvaluatorFactoryTest.java       | 19 ++++++-------------
 2 files changed, 8 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2856fbf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
index fb09b3e..e529088 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java
@@ -35,7 +35,6 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -144,13 +143,12 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
           // If the reader had no elements available, but the shard is not done, reuse it
later
           resultBuilder.addUnprocessedElements(
               Collections.<WindowedValue<?>>singleton(
-                  WindowedValue.timestampedValueInGlobalWindow(
+                  element.withValue(
                       UnboundedSourceShard.of(
                           shard.getSource(),
                           shard.getDeduplicator(),
                           reader,
-                          shard.getCheckpoint()),
-                      reader.getWatermark())));
+                          shard.getCheckpoint()))));
         }
       } catch (IOException e) {
         if (reader != null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2856fbf/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
index 18c7cec..9a7fec3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -260,7 +260,6 @@ public class UnboundedReadEvaluatorFactoryTest {
         (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
             Iterables.getOnlyElement(result.getUnprocessedElements());
     secondEvaluator.processElement(residual);
-
     TransformResult secondResult = secondEvaluator.finishBundle();
 
     // Sanity check that nothing was output (The test would have to run for more than a day
to do
@@ -269,14 +268,11 @@ public class UnboundedReadEvaluatorFactoryTest {
         secondOutput.commit(Instant.now()).getElements(),
         Matchers.<WindowedValue<Long>>emptyIterable());
 
-    // Test that even though the reader produced no outputs, there is still a residual shard
with
-    // the updated watermark.
-    WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>> unprocessed
=
-        (WindowedValue<UnboundedSourceShard<Long, TestCheckpointMark>>)
-            Iterables.getOnlyElement(secondResult.getUnprocessedElements());
-    assertThat(
-        unprocessed.getTimestamp(), Matchers.<ReadableInstant>greaterThan(residual.getTimestamp()));
-    assertThat(unprocessed.getValue().getExistingReader(), not(nullValue()));
+    // Test that even though the reader produced no outputs, there is still a residual shard.
+    UnboundedSourceShard<Long, TestCheckpointMark> residualShard =
+        (UnboundedSourceShard<Long, TestCheckpointMark>)
+            Iterables.getOnlyElement(secondResult.getUnprocessedElements()).getValue();
+    assertThat(residualShard.getExistingReader(), not(nullValue()));
   }
 
   @Test
@@ -381,8 +377,6 @@ public class UnboundedReadEvaluatorFactoryTest {
   }
 
   private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark>
{
-    private static int getWatermarkCalls = 0;
-
     static int readerClosedCount;
     static int readerAdvancedCount;
     private final Coder<T> coder;
@@ -453,8 +447,7 @@ public class UnboundedReadEvaluatorFactoryTest {
 
       @Override
       public Instant getWatermark() {
-        getWatermarkCalls++;
-        return new Instant(index + getWatermarkCalls);
+        return Instant.now();
       }
 
       @Override


Mime
View raw message