beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] incubator-beam git commit: [BEAM-862] Make Aggregator Creation Idempotent
Date Fri, 28 Oct 2016 20:49:08 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master e82c5d224 -> beccdc686


[BEAM-862] Make Aggregator Creation Idempotent

The problem was that the DoFnInvoker was invoking
createAggregatorForDoFn in the AggregatorFactory several times and
Flink only allows adding each aggregator once. This now adds a check
for whether an aggregator exists already.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/837bb2b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/837bb2b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/837bb2b7

Branch: refs/heads/master
Commit: 837bb2b71e515c9170fa2c031c86a618b085b249
Parents: e82c5d2
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Oct 27 15:31:20 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Oct 28 22:48:04 2016 +0200

----------------------------------------------------------------------
 .../translation/wrappers/streaming/DoFnOperator.java      | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/837bb2b7/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index fb444e0..a29664b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -174,10 +174,16 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
           ExecutionContext.StepContext stepContext,
           String aggregatorName,
           Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+
+        @SuppressWarnings("unchecked")
         SerializableFnAggregatorWrapper<InputT, OutputT> result =
-            new SerializableFnAggregatorWrapper<>(combine);
+            (SerializableFnAggregatorWrapper<InputT, OutputT>)
+                getRuntimeContext().getAccumulator(aggregatorName);
 
-        getRuntimeContext().addAccumulator(aggregatorName, result);
+        if (result == null) {
+          result = new SerializableFnAggregatorWrapper<>(combine);
+          getRuntimeContext().addAccumulator(aggregatorName, result);
+        }
         return result;
       }
     };


Mime
View raw message