beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [04/50] [abbrv] incubator-beam git commit: Add a system property, dataflow.spark.directBroadcast, to allow pipelines to bypass coders for broadcasts.
Date Thu, 10 Mar 2016 20:58:29 GMT
Add a system property, dataflow.spark.directBroadcast, to allow pipelines to bypass coders
for broadcasts.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2d00b3bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2d00b3bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2d00b3bd

Branch: refs/heads/master
Commit: 2d00b3bd7c55ebde5ed3242b697749b111dfb9ce
Parents: 7cff304
Author: Tom White <tom@cloudera.com>
Authored: Thu Jul 9 10:40:18 2015 +0100
Committer: Tom White <tom@cloudera.com>
Committed: Thu Mar 10 11:15:14 2016 +0000

----------------------------------------------------------------------
 .../dataflow/spark/BroadcastHelper.java         | 102 ++++++++++++++-----
 .../dataflow/spark/TransformTranslator.java     |   2 +-
 2 files changed, 79 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2d00b3bd/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
index 6a26787..2622ce9 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
@@ -25,39 +25,93 @@ import org.apache.spark.broadcast.Broadcast;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class BroadcastHelper<T> implements Serializable {
+abstract class BroadcastHelper<T> implements Serializable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);
+  /**
+   * If the property <code>dataflow.spark.directBroadcast</code> is set to
+   * <code>true</code> then Spark serialization (Kryo) will be used to broadcast
values
+   * in View objects. By default this property is not set, and values are coded using
+   * the appropriate {@link com.google.cloud.dataflow.sdk.coders.Coder}.
+   */
+  public static final String DIRECT_BROADCAST = "dataflow.spark.directBroadcast";
 
-  private Broadcast<byte[]> bcast;
-  private final Coder<T> coder;
-  private transient T value;
+  private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);
 
-  BroadcastHelper(T value, Coder<T> coder) {
-    this.value = value;
-    this.coder = coder;
+  public static <T> BroadcastHelper<T> create(T value, Coder<T> coder)
{
+    if (Boolean.getBoolean(DIRECT_BROADCAST)) {
+      return new DirectBroadcastHelper<>(value);
+    }
+    return new CodedBroadcastHelper<>(value, coder);
   }
 
-  public synchronized T getValue() {
-    if (value == null) {
-      value = deserialize();
+  public abstract T getValue();
+
+  public abstract void broadcast(JavaSparkContext jsc);
+
+  /**
+   * A {@link com.cloudera.dataflow.spark.BroadcastHelper} that relies on the underlying
+   * Spark serialization (Kryo) to broadcast values. This is appropriate when
+   * broadcasting very large values, since no copy of the object is made.
+   * @param <T>
+   */
+  static class DirectBroadcastHelper<T> extends BroadcastHelper<T> {
+    private Broadcast<T> bcast;
+    private transient T value;
+
+    DirectBroadcastHelper(T value) {
+      this.value = value;
+    }
+
+    public synchronized T getValue() {
+      if (value == null) {
+        value = bcast.getValue();
+      }
+      return value;
     }
-    return value;
-  }
 
-  public void broadcast(JavaSparkContext jsc) {
-    this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder));
+    public void broadcast(JavaSparkContext jsc) {
+      this.bcast = jsc.broadcast(value);
+    }
   }
 
-  private T deserialize() {
-    T val;
-    try {
-      val = coder.decode(new ByteArrayInputStream(bcast.value()), new Coder.Context(true));
-    } catch (IOException ioe) {
-      // this should not ever happen, log it if it does.
-      LOG.warn(ioe.getMessage());
-      val = null;
+  /**
+   * A {@link com.cloudera.dataflow.spark.BroadcastHelper} that uses a
+   * {@link com.google.cloud.dataflow.sdk.coders.Coder} to encode values as byte arrays
+   * before broadcasting.
+   * @param <T>
+   */
+  static class CodedBroadcastHelper<T> extends BroadcastHelper<T> {
+    private Broadcast<byte[]> bcast;
+    private final Coder<T> coder;
+    private transient T value;
+
+    CodedBroadcastHelper(T value, Coder<T> coder) {
+      this.value = value;
+      this.coder = coder;
+    }
+
+    public synchronized T getValue() {
+      if (value == null) {
+        value = deserialize();
+      }
+      return value;
+    }
+
+    public void broadcast(JavaSparkContext jsc) {
+      this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder));
+    }
+
+    private T deserialize() {
+      T val;
+      try {
+        val = coder.decode(new ByteArrayInputStream(bcast.value()),
+            new Coder.Context(true));
+      } catch (IOException ioe) {
+        // this should not ever happen, log it if it does.
+        LOG.warn(ioe.getMessage());
+        val = null;
+      }
+      return val;
     }
-    return val;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2d00b3bd/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
index 195766e..e1af3cf 100644
--- a/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/TransformTranslator.java
@@ -599,7 +599,7 @@ public final class TransformTranslator {
       for (PCollectionView<?> view : views) {
         Iterable<WindowedValue<?>> collectionView = context.getPCollectionView(view);
         Coder<Iterable<WindowedValue<?>>> coderInternal = view.getCoderInternal();
-        BroadcastHelper<?> helper = new BroadcastHelper<>(collectionView, coderInternal);
+        BroadcastHelper<?> helper = BroadcastHelper.create(collectionView, coderInternal);
         //broadcast side inputs
         helper.broadcast(context.getSparkContext());
         sideInputs.put(view.getTagInternal(), helper);


Mime
View raw message