beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 02/03: Fix accumulators initialization in Combine that prevented CombineGlobally to work.
Date Tue, 09 Jul 2019 13:00:01 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f125725f0bc19dd260a9953f621efb92af83f9d9
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Tue Jul 9 14:44:54 2019 +0200

    Fix accumulators initialization in Combine that prevented CombineGlobally to work.
---
 .../translation/batch/AggregatorCombiner.java      | 61 ++++------------------
 .../batch/CombinePerKeyTranslatorBatch.java        |  1 +
 .../translation/batch/CombineTest.java             |  1 -
 3 files changed, 11 insertions(+), 52 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
index e005c6f..001fe43 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
@@ -39,8 +39,6 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
 import org.apache.spark.sql.Encoder;
 import org.apache.spark.sql.expressions.Aggregator;
-import org.joda.time.Instant;
-import scala.Tuple2;
 
 /** An {@link Aggregator} for the Spark Batch Runner.
  * The accumulator is a {@code Iterable<WindowedValue<AccumT>> because an {@code
InputT} can be in multiple windows. So, when accumulating {@code InputT} values, we create
one accumulator per input window.
@@ -63,56 +61,16 @@ class AggregatorCombiner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
     return new ArrayList<>();
   }
 
+  private Iterable<WindowedValue<AccumT>> createAccumulator(WindowedValue<KV<K,
InputT>> inputWv) {
+    AccumT accumulator = combineFn.createAccumulator();
+    combineFn.addInput(accumulator, inputWv.getValue().getValue());
+    return Lists.newArrayList(WindowedValue
+        .of(accumulator, inputWv.getTimestamp(), inputWv.getWindows(),
+            inputWv.getPane()));
+  }
   @Override public Iterable<WindowedValue<AccumT>> reduce(Iterable<WindowedValue<AccumT>>
accumulators,
       WindowedValue<KV<K, InputT>> inputWv) {
-
-    KV<K, InputT> inputKv = inputWv.getValue();
-    //concatenate accumulators windows and input windows and merge the windows
-    Collection<W> inputWindows = (Collection<W>)inputWv.getWindows();
-    Set<W> windows = collectAccumulatorsWindows(accumulators);
-    windows.addAll(inputWindows);
-    Map<W, W> windowToMergeResult;
-    try {
-      windowToMergeResult = mergeWindows(windowingStrategy, windows);
-    } catch (Exception e) {
-      throw new RuntimeException("Unable to merge accumulators windows and input windows",
e);
-    }
-
-    // iterate through the input windows and for each, create an accumulator with the merged
window
-    // associated to it and call addInput with the accumulator.
-    // Maintain a map of the accumulators for use as output
-    Map<W, Tuple2<AccumT, Instant>> windowToAccumAndInstant = new HashMap<>();
-    for (W inputWindow:inputWindows) {
-      W mergedWindow = windowToMergeResult.get(inputWindow);
-      mergedWindow = mergedWindow == null ? inputWindow : mergedWindow;
-      Tuple2<AccumT, Instant> accumAndInstant = windowToAccumAndInstant.get(mergedWindow);
-      // if there is no accumulator associated with this window yet, create one
-      if (accumAndInstant == null) {
-        AccumT accum = combineFn.addInput(combineFn.createAccumulator(), inputKv.getValue());
-        Instant windowTimestamp =
-            timestampCombiner.assign(
-                mergedWindow, windowingStrategy.getWindowFn().getOutputTime(inputWv.getTimestamp(),
mergedWindow));
-        accumAndInstant = new Tuple2<>(accum, windowTimestamp);
-      } else {
-        AccumT updatedAccum =
-            combineFn.addInput(accumAndInstant._1, inputKv.getValue());
-        Instant updatedTimestamp = timestampCombiner.combine(accumAndInstant._2, timestampCombiner
-            .assign(mergedWindow,
-                windowingStrategy.getWindowFn().getOutputTime(inputWv.getTimestamp(), mergedWindow)));
-        accumAndInstant = new Tuple2<>(updatedAccum, updatedTimestamp);
-      }
-      windowToAccumAndInstant.put(mergedWindow, accumAndInstant);
-    }
-    // output the accumulators map
-    //TODO perf: to avoid memory allocation it is authorized to modify accumulators argument
-    List<WindowedValue<AccumT>> result = new ArrayList<>();
-    for (Map.Entry<W, Tuple2<AccumT, Instant>> entry : windowToAccumAndInstant.entrySet())
{
-      AccumT accumulator = entry.getValue()._1;
-      Instant windowTimestamp = entry.getValue()._2;
-      W window = entry.getKey();
-      result.add(WindowedValue.of(accumulator, windowTimestamp, window, PaneInfo.NO_FIRING));
-    }
-    return result;
+    return merge(accumulators, createAccumulator(inputWv));
   }
 
   @Override public Iterable<WindowedValue<AccumT>> merge(
@@ -182,7 +140,8 @@ class AggregatorCombiner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
       // an accumulator has only one window associated to it.
       W accumulatorWindow = (W) accumulator.getWindows().iterator().next();
       windows.add(accumulatorWindow);
-    } return windows;
+    }
+    return windows;
   }
 
   private Map<W, W> mergeWindows(WindowingStrategy<InputT, W> windowingStrategy,
Set<W> windows)
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 12e5944..6ba28ea 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -63,6 +63,7 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
         new AggregatorCombiner<K, InputT, AccumT, OutputT, BoundedWindow>(combineFn,
             windowingStrategy).toColumn());
 
+    //expand the list into separate elements and put the key back into the elements
     Dataset<WindowedValue<KV<K, OutputT>>> outputDataset = combinedDataset.flatMap(
         (FlatMapFunction<Tuple2<K, Iterable<WindowedValue<OutputT>>>,
WindowedValue<KV<K, OutputT>>>) tuple2 -> {
           K key = tuple2._1;
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
index ef4468b..3505da5 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
@@ -52,7 +52,6 @@ public class CombineTest implements Serializable {
     p = Pipeline.create(options);
   }
 
-  @Ignore
   @Test
   public void testCombineGlobally() {
     PCollection<Integer> input =


Mime
View raw message