beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/5] incubator-beam git commit: [flink] improve lifecycle of ParDoBoundWrapper
Date Wed, 08 Jun 2016 16:10:55 GMT
[flink] improve lifecycle of ParDoBoundWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d10ae23c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d10ae23c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d10ae23c

Branch: refs/heads/master
Commit: d10ae23c9bc9529d04d02951bfed01bbf2957773
Parents: ffbfc66
Author: Maximilian Michels <mxm@apache.org>
Authored: Mon Jun 6 12:40:50 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Jun 8 15:19:50 2016 +0200

----------------------------------------------------------------------
 .../streaming/FlinkAbstractParDoWrapper.java      | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d10ae23c/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index bb6ed67..117303c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -66,15 +66,21 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
     this.windowingStrategy = windowingStrategy;
   }
 
-  private void initContext(DoFn<IN, OUTDF> function, Collector<WindowedValue<OUTFL>>
outCollector) {
-    if (this.context == null) {
-      this.context = new DoFnProcessContext(function, outCollector);
-    }
+  @Override
+  public void open(Configuration parameters) throws Exception {
+    this.doFn.startBundle(context);
+  }
+
+  @Override
+  public void close() throws Exception {
+    this.doFn.finishBundle(context);
   }
 
   @Override
   public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>>
out) throws Exception {
-    this.initContext(doFn, out);
+    if (this.context == null) {
+      this.context = new DoFnProcessContext(doFn, out);
+    }
 
     // for each window the element belongs to, create a new copy here.
     Collection<? extends BoundedWindow> windows = value.getWindows();
@@ -90,9 +96,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL>
extends RichFl
 
   private void processElement(WindowedValue<IN> value) throws Exception {
     this.context.setElement(value);
-    this.doFn.startBundle(context);
     doFn.processElement(context);
-    this.doFn.finishBundle(context);
   }
 
   private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {


Mime
View raw message