beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Fix LateDataDroppingDoFnRunner to not increment the counter on reiteration.
Date Fri, 09 Dec 2016 22:47:58 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 05bb254b8 -> 1fab1521a


Fix LateDataDroppingDoFnRunner to not increment the counter on reiteration.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7dbf16e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7dbf16e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7dbf16e

Branch: refs/heads/master
Commit: d7dbf16e9886e29224361781f7beb5c2087ff6d0
Parents: 05bb254
Author: Sam Whittle <samuelw@google.com>
Authored: Fri Dec 9 12:43:04 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Fri Dec 9 14:47:44 2016 -0800

----------------------------------------------------------------------
 .../core/LateDataDroppingDoFnRunner.java        | 25 +++++++++++++-------
 .../core/LateDataDroppingDoFnRunnerTest.java    |  3 +++
 2 files changed, 20 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7dbf16e/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index b6f700f..9bfe9ae 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -116,21 +116,30 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends
BoundedWin
                     }
                   });
             }});
+      Iterable<WindowedValue<InputT>> concatElements = Iterables.concat(windowsExpandedElements);
+
+      // Bump the counter separately since we don't want multiple iterations to
+      // increase it multiple times.
+      for (WindowedValue<InputT> input : concatElements) {
+        BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+        if (canDropDueToExpiredWindow(window)) {
+          // The element is too late for this window.
+          droppedDueToLateness.addValue(1L);
+          WindowTracing.debug(
+              "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{}
"
+              + "since too far behind inputWatermark:{}; outputWatermark:{}",
+              input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+              timerInternals.currentOutputWatermarkTime());
+        }
+      }
 
       Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
-          Iterables.concat(windowsExpandedElements),
+          concatElements,
           new Predicate<WindowedValue<InputT>>() {
             @Override
             public boolean apply(WindowedValue<InputT> input) {
               BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
               if (canDropDueToExpiredWindow(window)) {
-                // The element is too late for this window.
-                droppedDueToLateness.addValue(1L);
-                WindowTracing.debug(
-                    "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{}
"
-                    + "since too far behind inputWatermark:{}; outputWatermark:{}",
-                    input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
-                    timerInternals.currentOutputWatermarkTime());
                 return false;
               } else {
                 return true;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7dbf16e/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
index 1cf05b6..3cd5d4a 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java
@@ -80,6 +80,9 @@ public class LateDataDroppingDoFnRunnerTest {
         createDatum(18, 18L));
     assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class)));
     assertEquals(1, droppedDueToLateness.sum);
+    // Ensure that reiterating returns the same results and doesn't increment the counter
again.
+    assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class)));
+    assertEquals(1, droppedDueToLateness.sum);
   }
 
   private <T> WindowedValue<T> createDatum(T element, long timestampMillis) {


Mime
View raw message