crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject git commit: CRUNCH-435: Cleaner controls for configuring and initializing the SparkContext and runtime Configuration.
Date Mon, 07 Jul 2014 01:01:27 GMT
Repository: crunch
Updated Branches:
  refs/heads/master ebe306126 -> 8434dcaab


CRUNCH-435: Cleaner controls for configuring and initializing the
SparkContext and runtime Configuration.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8434dcaa
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8434dcaa
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8434dcaa

Branch: refs/heads/master
Commit: 8434dcaab5504e69c42b914551634b2a9ac838b2
Parents: ebe3061
Author: Josh Wills <jwills@apache.org>
Authored: Tue Jul 1 17:07:06 2014 -0700
Committer: Josh Wills <jwills@apache.org>
Committed: Sun Jul 6 17:57:00 2014 -0700

----------------------------------------------------------------------
 .../apache/crunch/impl/spark/SparkPipeline.java | 30 ++++++++++++++++----
 .../crunch/impl/spark/SparkRuntimeContext.java  |  8 ++----
 2 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/8434dcaa/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
index 1076c33..7a69707 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -36,6 +36,7 @@ import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.storage.StorageLevel;
 
@@ -55,13 +56,17 @@ public class SparkPipeline extends DistributedPipeline {
   }
 
   public SparkPipeline(String sparkConnect, String appName, Class<?> jarClass) {
-    super(appName, new Configuration(), new SparkCollectFactory());
+    this(sparkConnect, appName, jarClass, new Configuration());
+  }
+
+  public SparkPipeline(String sparkConnect, String appName, Class<?> jarClass, Configuration
conf) {
+    super(appName, conf, new SparkCollectFactory());
     this.sparkConnect = Preconditions.checkNotNull(sparkConnect);
     this.jarClass = jarClass;
   }
 
   public SparkPipeline(JavaSparkContext sparkContext, String appName) {
-    super(appName, new Configuration(), new SparkCollectFactory());
+    super(appName, sparkContext.hadoopConfiguration(), new SparkCollectFactory());
     this.sparkContext = Preconditions.checkNotNull(sparkContext);
     this.sparkConnect = sparkContext.getSparkHome().orNull();
   }
@@ -120,8 +125,16 @@ public class SparkPipeline extends DistributedPipeline {
         outputTargetsToMaterialize.remove(c);
       }
     }
+
+    Configuration conf = getConfiguration();
     if (sparkContext == null) {
-      this.sparkContext = new JavaSparkContext(sparkConnect, getName());
+      SparkConf sparkConf = new SparkConf();
+      for (Map.Entry<String, String> e : conf) {
+        if (e.getKey().startsWith("spark.")) {
+          sparkConf.set(e.getKey(), e.getValue());
+        }
+      }
+      this.sparkContext = new JavaSparkContext(sparkConnect, getName(), sparkConf);
       if (jarClass != null) {
         String[] jars = JavaSparkContext.jarOfClass(jarClass);
         if (jars != null && jars.length > 0) {
@@ -131,8 +144,9 @@ public class SparkPipeline extends DistributedPipeline {
         }
       }
     }
-    SparkRuntime runtime = new SparkRuntime(this, sparkContext, getConfiguration(), outputTargets,
toMaterialize,
-        cachedCollections);
+
+    copyConfiguration(conf, sparkContext.hadoopConfiguration());
+    SparkRuntime runtime = new SparkRuntime(this, sparkContext, conf, outputTargets, toMaterialize,
cachedCollections);
     runtime.execute();
     outputTargets.clear();
     return runtime;
@@ -147,4 +161,10 @@ public class SparkPipeline extends DistributedPipeline {
     }
     return res;
   }
+
+  private static void copyConfiguration(Configuration from, Configuration to) {
+    for (Map.Entry<String, String> e : from) {
+      to.set(e.getKey(), e.getValue());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/8434dcaa/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
index cea317c..ca52c29 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntimeContext.java
@@ -20,6 +20,7 @@ package org.apache.crunch.impl.spark;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.io.ByteStreams;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.TaskInputOutputContextFactory;
@@ -34,8 +35,6 @@ import org.apache.spark.Accumulator;
 import org.apache.spark.SparkFiles;
 import org.apache.spark.broadcast.Broadcast;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
@@ -59,6 +58,7 @@ public class SparkRuntimeContext implements Serializable {
 
   public void setConf(Broadcast<byte[]> broadConf) {
     this.broadConf = broadConf;
+    this.conf = null;
   }
 
   public void initialize(DoFn<?, ?> fn) {
@@ -94,9 +94,7 @@ public class SparkRuntimeContext implements Serializable {
     if (conf == null) {
       conf = new Configuration();
       try {
-        ByteArrayInputStream bais = new ByteArrayInputStream(broadConf.value());
-        conf.readFields(new DataInputStream(bais));
-        bais.close();
+        conf.readFields(ByteStreams.newDataInput(broadConf.value()));
       } catch (Exception e) {
         throw new RuntimeException("Error reading broadcast configuration", e);
       }


Mime
View raw message