beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 35/45: Fixed immutable list bug
Date Tue, 09 Jul 2019 13:18:52 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 78270bfa492c7e08b0615322f51b3c8b02882681
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Thu Jul 4 15:04:51 2019 +0200

    Fixed immutable list bug
---
 .../structuredstreaming/translation/batch/AggregatorCombiner.java      | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
index 138e5c2..e005c6f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.expressions.Aggregator;
 import org.joda.time.Instant;
@@ -137,7 +138,7 @@ class AggregatorCombiner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
       mergedWindowForAccumulator = (mergedWindowForAccumulator == null) ? (W)accumulatorWindow
: mergedWindowForAccumulator;
 
       if (mergedWindowToAccumulators.get(mergedWindowForAccumulator) == null){
-        mergedWindowToAccumulators.put(mergedWindowForAccumulator, Collections.singletonList(accumulator));
+        mergedWindowToAccumulators.put(mergedWindowForAccumulator, Lists.newArrayList(accumulator));
       }
       else {
         mergedWindowToAccumulators.get(mergedWindowForAccumulator).add(accumulator);


Mime
View raw message