beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [7/9] beam git commit: Tidy LateDataDroppingDoFnRunner
Date Thu, 22 Jun 2017 23:03:59 GMT
Tidy LateDataDroppingDoFnRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4e5db51
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4e5db51
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4e5db51

Branch: refs/heads/master
Commit: d4e5db51a025a831ddf4e3bc0e003caebabf647b
Parents: 497cfab
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Jun 22 11:56:53 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Jun 22 13:58:08 2017 -0700

----------------------------------------------------------------------
 .../core/LateDataDroppingDoFnRunner.java        | 33 ++++++++++----------
 1 file changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d4e5db51/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 1cf1509..28938c1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -134,26 +134,27 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends
BoundedWin
           // The element is too late for this window.
           droppedDueToLateness.inc();
           WindowTracing.debug(
-              "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{}
"
-              + "since too far behind inputWatermark:{}; outputWatermark:{}",
-              input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+              "{}: Dropping element at {} for key:{}; window:{} "
+                  + "since too far behind inputWatermark:{}; outputWatermark:{}",
+              LateDataFilter.class.getSimpleName(),
+              input.getTimestamp(),
+              key,
+              window,
+              timerInternals.currentInputWatermarkTime(),
               timerInternals.currentOutputWatermarkTime());
         }
       }
 
-      Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
-          concatElements,
-          new Predicate<WindowedValue<InputT>>() {
-            @Override
-            public boolean apply(WindowedValue<InputT> input) {
-              BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
-              if (canDropDueToExpiredWindow(window)) {
-                return false;
-              } else {
-                return true;
-              }
-            }
-          });
+      Iterable<WindowedValue<InputT>> nonLateElements =
+          Iterables.filter(
+              concatElements,
+              new Predicate<WindowedValue<InputT>>() {
+                @Override
+                public boolean apply(WindowedValue<InputT> input) {
+                  BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+                  return !canDropDueToExpiredWindow(window);
+                }
+              });
       return nonLateElements;
     }
 


Mime
View raw message