beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/2] incubator-beam git commit: [flink] fix potential NPE in ParDoWrapper
Date Mon, 13 Jun 2016 12:58:21 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 60964b611 -> be05942da


[flink] fix potential NPE in ParDoWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a2abc6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a2abc6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a2abc6a2

Branch: refs/heads/master
Commit: a2abc6a249cdc4e6000d1539df6c3b5cde8d39b0
Parents: 60964b6
Author: Maximilian Michels <mxm@apache.org>
Authored: Fri Jun 10 14:26:45 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Mon Jun 13 14:56:54 2016 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/FlinkAbstractParDoWrapper.java       | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2abc6a2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index a935011..3c37aa9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -70,18 +70,21 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
 
   @Override
   public void open(Configuration parameters) throws Exception {
-    this.doFn.startBundle(context);
   }
 
   @Override
   public void close() throws Exception {
-    this.doFn.finishBundle(context);
+    if (this.context != null) {
+      // we have initialized the context
+      this.doFn.finishBundle(this.context);
+    }
   }
 
   @Override
   public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>>
out) throws Exception {
     if (this.context == null) {
       this.context = new DoFnProcessContext(doFn, out);
+      this.doFn.startBundle(this.context);
     }
 
     // for each window the element belongs to, create a new copy here.
@@ -98,7 +101,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
 
   private void processElement(WindowedValue<IN> value) throws Exception {
     this.context.setElement(value);
-    doFn.processElement(context);
+    doFn.processElement(this.context);
   }
 
   private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {


Mime
View raw message