beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From echauc...@apache.org
Subject [beam] 24/45: Apply a groupByKey avoids for some reason that the spark structured streaming fmwk casts data to Row which makes it impossible to deserialize without the coder shipped into the data. For performance reasons (avoid memory consumption and having to deserialize), we do not ship coder + data. Also add a mapparitions before GBK to avoid shuffling
Date Tue, 09 Jul 2019 13:18:41 GMT
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit e77e95112ab59da871a8c082f9654747fcccb3a9
Author: Etienne Chauchot <echauchot@apache.org>
AuthorDate: Wed Jun 26 17:44:59 2019 +0200

    Apply a groupByKey avoids for some reason that the spark structured streaming fmwk casts
data to Row which makes it impossible to deserialize without the coder shipped into the data.
For performance reasons (avoid memory consumption and having to deserialize), we do not ship
coder + data. Also add a mapparitions before GBK to avoid shuffling
---
 .../batch/AggregatorCombinerGlobally.java          |  5 ++-
 .../batch/CombineGloballyTranslatorBatch.java      | 44 ++++++++++++++++++----
 2 files changed, 40 insertions(+), 9 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
index d3ad62c..c516629 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombinerGlobally.java
@@ -46,7 +46,7 @@ import scala.Tuple2;
  * */
 
 class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindow>
-    extends Aggregator<WindowedValue<InputT>, Iterable<WindowedValue<AccumT>>,
Iterable<WindowedValue<OutputT>>> {
+    extends Aggregator<Tuple2<Integer, WindowedValue<InputT>>, Iterable<WindowedValue<AccumT>>,
Iterable<WindowedValue<OutputT>>> {
 
   private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
   private WindowingStrategy<InputT, W> windowingStrategy;
@@ -63,8 +63,9 @@ class AggregatorCombinerGlobally<InputT, AccumT, OutputT, W extends BoundedWindo
   }
 
   @Override public Iterable<WindowedValue<AccumT>> reduce(Iterable<WindowedValue<AccumT>>
accumulators,
-      WindowedValue<InputT> input) {
+      Tuple2<Integer, WindowedValue<InputT>> inputTupe) {
 
+    WindowedValue<InputT> input = inputTupe._2;
     //concatenate accumulators windows and input windows and merge the windows
     Collection<W> inputWindows = (Collection<W>)input.getWindows();
     Set<W> windows = collectAccumulatorsWindows(accumulators);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
index f29b2c5..9219e3f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineGloballyTranslatorBatch.java
@@ -17,18 +17,23 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
 import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
+import org.apache.spark.sql.KeyValueGroupedDataset;
+import scala.Tuple2;
 
 /**
  * By default Combine.globally is translated as a composite transform that does a Pardo (to
key the
@@ -54,12 +59,37 @@ class CombineGloballyTranslatorBatch<InputT, AccumT, OutputT>
     WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
     Dataset<WindowedValue<InputT>> inputDataset = context.getDataset(input);
 
-    Dataset<Row> combinedRowDataset =
-        inputDataset.agg(new AggregatorCombinerGlobally<>(combineFn, windowingStrategy).toColumn());
+    // applying a groupByKey avoids for some reason that the spark structured streaming fmwk
+    // casts data to Row which makes it impossible to deserialize without
+    // the coder shipped into the data. For performance reasons
+    // (avoid memory consumption and having to deserialize), we do not ship coder + data.
+
+    // We do not want to shuffle data during groupByKey, we cannot get the number of partitions
for
+    // the input dataset without triggering a costly operation (conversion to rdd) so we
cannot use spark Hashpartitioner
+    // so we apply a key to each input dataset partition and then trigger a GBK that should
not shuffle data.
+
+    Dataset<Tuple2<Integer, WindowedValue<InputT>>> keyedDataset = inputDataset
+        .mapPartitions((MapPartitionsFunction<WindowedValue<InputT>, Tuple2<Integer,
WindowedValue<InputT>>>) inputTIterator -> {
+          List<Tuple2<Integer, WindowedValue<InputT>>> result = new ArrayList<>();
+          Random random = new Random();
+          while (inputTIterator.hasNext()) {
+            result.add(Tuple2.apply(random.nextInt(), inputTIterator.next()));
+          }
+          return result.iterator();
+        }, EncoderHelpers.tuple2Encoder());
+
+    KeyValueGroupedDataset<Integer, Tuple2<Integer, WindowedValue<InputT>>>
groupedDataset = keyedDataset
+        .groupByKey(
+            (MapFunction<Tuple2<Integer, WindowedValue<InputT>>, Integer>)
value -> value._1(),
+            EncoderHelpers.windowedValueEncoder());
+
+    Dataset<Tuple2<Integer, Iterable<WindowedValue<OutputT>>>> combinedDataset
= groupedDataset
+        .agg(new AggregatorCombinerGlobally<>(combineFn, windowingStrategy).toColumn());
+
+    Dataset<Iterable<WindowedValue<OutputT>>> accumulatedDataset = combinedDataset.map(
+        (MapFunction<Tuple2<Integer, Iterable<WindowedValue<OutputT>>>,
Iterable<WindowedValue<OutputT>>>) value -> value._2,
+        EncoderHelpers.genericEncoder());
 
-    Dataset<Iterable<WindowedValue<OutputT>>> accumulatedDataset =
-        combinedRowDataset.map(
-            RowHelpers.extractObjectFromRowMapFunction(), EncoderHelpers.genericEncoder());
     Dataset<WindowedValue<OutputT>> outputDataset = accumulatedDataset.flatMap(
         (FlatMapFunction<Iterable<WindowedValue<OutputT>>, WindowedValue<OutputT>>)
             windowedValues -> windowedValues.iterator(), EncoderHelpers.windowedValueEncoder());


Mime
View raw message