beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] beam git commit: [BEAM-1623] Transform Reshuffle directly in Spark runner
Date Mon, 06 Mar 2017 09:49:12 GMT
Repository: beam
Updated Branches:
  refs/heads/master 69d951225 -> 34b38ef95


[BEAM-1623] Transform Reshuffle directly in Spark runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8bc618e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8bc618e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8bc618e

Branch: refs/heads/master
Commit: d8bc618edafd07ae8e0ec692fc7f3df7395b876e
Parents: 69d9512
Author: Aviem Zur <aviemzur@gmail.com>
Authored: Sun Mar 5 07:15:32 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Mon Mar 6 11:19:22 2017 +0200

----------------------------------------------------------------------
 .../translation/GroupCombineFunctions.java      | 22 ++++++++++++
 .../spark/translation/TransformTranslator.java  | 38 +++++++++++++++-----
 .../spark/translation/TranslationUtils.java     | 10 ++++++
 .../streaming/StreamingTransformTranslator.java | 36 +++++++++++++++++++
 4 files changed, 97 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index 1e879ce..b2a589d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -203,4 +203,26 @@ public class GroupCombineFunctions {
 
     return accumulatedBytes.mapToPair(CoderHelpers.fromByteFunction(keyCoder, iterAccumCoder));
   }
+
+  /**
+   * An implementation of
+   * {@link org.apache.beam.sdk.util.Reshuffle} for the Spark runner.
+   */
+  public static <K, V> JavaRDD<WindowedValue<KV<K, V>>> reshuffle(
+      JavaRDD<WindowedValue<KV<K, V>>> rdd,
+      Coder<K> keyCoder,
+      WindowedValueCoder<V> wvCoder) {
+
+    // Use coders to convert objects in the PCollection to byte arrays, so they
+    // can be transferred over the network for the shuffle.
+    return rdd
+        .map(new ReifyTimestampsAndWindowsFunction<K, V>())
+        .map(WindowingHelpers.<KV<K, WindowedValue<V>>>unwindowFunction())
+        .mapToPair(TranslationUtils.<K, WindowedValue<V>>toPairFunction())
+        .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder))
+        .repartition(rdd.getNumPartitions())
+        .mapToPair(CoderHelpers.fromByteFunction(keyCoder, wvCoder))
+        .map(TranslationUtils.<K, WindowedValue<V>>fromPairFunction())
+        .map(TranslationUtils.<K, V>toKVByWindowInValue());
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index a4939b9..0ae7313 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -69,6 +69,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -318,15 +319,7 @@ public final class TransformTranslator {
                         return sparkCombineFn.extractOutput(iter);
                       }
                 }).map(TranslationUtils.<K, WindowedValue<OutputT>>fromPairFunction())
-                  .map(new Function<KV<K, WindowedValue<OutputT>>,
-                      WindowedValue<KV<K, OutputT>>>() {
-                        @Override
-                          public WindowedValue<KV<K, OutputT>> call(
-                              KV<K, WindowedValue<OutputT>> kv) throws Exception
{
-                                WindowedValue<OutputT> wv = kv.getValue();
-                                return wv.withValue(KV.of(kv.getKey(), wv.getValue()));
-                              }
-                      });
+                  .map(TranslationUtils.<K, OutputT>toKVByWindowInValue());
 
         context.putDataset(transform, new BoundedDataset<>(outRdd));
       }
@@ -735,6 +728,32 @@ public final class TransformTranslator {
     };
   }
 
+  private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K,
V>> reshuffle() {
+    return new TransformEvaluator<Reshuffle<K, V>>() {
+      @Override public void evaluate(Reshuffle<K, V> transform, EvaluationContext context)
{
+        @SuppressWarnings("unchecked")
+        JavaRDD<WindowedValue<KV<K, V>>> inRDD =
+            ((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD();
+        @SuppressWarnings("unchecked")
+        final WindowingStrategy<?, W> windowingStrategy =
+            (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
+        @SuppressWarnings("unchecked")
+        final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
+        @SuppressWarnings("unchecked")
+        final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();
+
+        final Coder<K> keyCoder = coder.getKeyCoder();
+        final WindowedValue.WindowedValueCoder<V> wvCoder =
+            WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
+
+        JavaRDD<WindowedValue<KV<K, V>>> reshuffled =
+            GroupCombineFunctions.reshuffle(inRDD, keyCoder, wvCoder);
+
+        context.putDataset(transform, new BoundedDataset<>(reshuffled));
+      }
+    };
+  }
+
   private static final Map<Class<? extends PTransform>, TransformEvaluator<?>>
EVALUATORS = Maps
       .newHashMap();
 
@@ -753,6 +772,7 @@ public final class TransformTranslator {
     EVALUATORS.put(View.AsIterable.class, viewAsIter());
     EVALUATORS.put(View.CreatePCollectionView.class, createPCollView());
     EVALUATORS.put(Window.Assign.class, window());
+    EVALUATORS.put(Reshuffle.class, reshuffle());
     // mostly test evaluators
     EVALUATORS.put(StorageLevelPTransform.class, storageLevel());
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 158593e..f2b3418 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -159,6 +159,16 @@ public final class TranslationUtils {
         };
       }
 
+  /** Extract window from a {@link KV} with {@link WindowedValue} value. */
+  static <K, V> Function<KV<K, WindowedValue<V>>, WindowedValue<KV<K,
V>>> toKVByWindowInValue() {
+    return new Function<KV<K, WindowedValue<V>>, WindowedValue<KV<K,
V>>>() {
+      @Override public WindowedValue<KV<K, V>> call(KV<K, WindowedValue<V>>
kv) throws Exception {
+        WindowedValue<V> wv = kv.getValue();
+        return wv.withValue(KV.of(kv.getKey(), wv.getValue()));
+      }
+    };
+  }
+
   /**
    * A utility class to filter {@link TupleTag}s.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/d8bc618e/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index 628b713..31307cc 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -73,6 +73,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.CombineFnUtil;
+import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -445,6 +446,40 @@ final class StreamingTransformTranslator {
     };
   }
 
+  private static <K, V, W extends BoundedWindow> TransformEvaluator<Reshuffle<K,
V>> reshuffle() {
+    return new TransformEvaluator<Reshuffle<K, V>>() {
+      @Override
+      public void evaluate(Reshuffle<K, V> transform, EvaluationContext context) {
+        @SuppressWarnings("unchecked") UnboundedDataset<KV<K, V>> inputDataset
=
+            (UnboundedDataset<KV<K, V>>) context.borrowDataset(transform);
+        List<Integer> streamSources = inputDataset.getStreamSources();
+        JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
+        @SuppressWarnings("unchecked")
+        final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
+        @SuppressWarnings("unchecked")
+        final WindowingStrategy<?, W> windowingStrategy =
+            (WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
+        @SuppressWarnings("unchecked")
+        final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();
+
+        final WindowedValue.WindowedValueCoder<V> wvCoder =
+            WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
+
+        JavaDStream<WindowedValue<KV<K, V>>> reshuffledStream =
+            dStream.transform(new Function<JavaRDD<WindowedValue<KV<K, V>>>,
+                JavaRDD<WindowedValue<KV<K, V>>>>() {
+              @Override
+              public JavaRDD<WindowedValue<KV<K, V>>> call(
+                  JavaRDD<WindowedValue<KV<K, V>>> rdd) throws Exception
{
+                return GroupCombineFunctions.reshuffle(rdd, coder.getKeyCoder(), wvCoder);
+              }
+            });
+
+        context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, streamSources));
+      }
+    };
+  }
+
   private static final Map<Class<? extends PTransform>, TransformEvaluator<?>>
EVALUATORS =
       Maps.newHashMap();
 
@@ -457,6 +492,7 @@ final class StreamingTransformTranslator {
     EVALUATORS.put(CreateStream.class, createFromQueue());
     EVALUATORS.put(Window.Assign.class, window());
     EVALUATORS.put(Flatten.PCollections.class, flattenPColl());
+    EVALUATORS.put(Reshuffle.class, reshuffle());
   }
 
   /**


Mime
View raw message