beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] beam git commit: Fix race condition when outputting pushed-back elements in Flink Runner
Date Sun, 04 Jun 2017 06:14:53 GMT
Repository: beam
Updated Branches:
  refs/heads/master 43c44232d -> ef56ea495


Fix race condition when outputting pushed-back elements in Flink Runner

This affected the Flink Streaming Runner DoFnOperator. The recent fix of
emitting pushed-back data when receiving a watermark on the first input
put the emission at the end of the method. This can cause the emitted
data to become late. The fix is to move the pushed-back element emission
to the start of the method.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d17c0132
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d17c0132
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d17c0132

Branch: refs/heads/master
Commit: d17c013240a14b12992cf00f30e5151c7e97f360
Parents: 43c4423
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Fri Jun 2 15:57:01 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sun Jun 4 08:08:14 2017 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java          | 18 +++++++++---------
 1 file changed, 9 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d17c0132/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index e473046..594fe0e 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -445,6 +445,15 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   @Override
   public void processWatermark1(Watermark mark) throws Exception {
+    // We do the check here because we are guaranteed to at least get the +Inf watermark
on the
+    // main input when the job finishes.
+    if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      // this means we will never see any more side input
+      // we also do the check here because we might have received the side-input MAX watermark
+      // before receiving any main-input data
+      emitAllPushedBackData();
+    }
+
     if (keyCoder == null) {
       setCurrentInputWatermark(mark.getTimestamp());
       long potentialOutputWatermark =
@@ -476,15 +485,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       }
       pushbackDoFnRunner.finishBundle();
     }
-
-    // We do the check here because we are guaranteed to at least get the +Inf watermark
on the
-    // main input when the job finishes.
-    if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-      // this means we will never see any more side input
-      // we also do the check here because we might have received the side-input MAX watermark
-      // before receiving any main-input data
-      emitAllPushedBackData();
-    }
   }
 
   @Override


Mime
View raw message