hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1612643 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: SparkClient.java SparkTask.java
Date Tue, 22 Jul 2014 18:42:51 GMT
Author: xuefu
Date: Tue Jul 22 18:42:51 2014
New Revision: 1612643

URL: http://svn.apache.org/r1612643
Log:
HIVE-7436: Load Spark configuration into Hive driver [Spark Branch]

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java?rev=1612643&r1=1612642&r2=1612643&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
(original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkClient.java
Tue Jul 22 18:42:51 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.s
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -45,57 +46,25 @@ import org.apache.spark.api.java.JavaPai
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 public class SparkClient implements Serializable {
   private static final long serialVersionUID = 1L;
   protected static transient final Log LOG = LogFactory.getLog(SparkClient.class);
 
-  private static String masterUrl = "local";
+  private static final String SPARK_DEFAULT_CONF_FILE = "spark-defaults.conf";
+  private static final String SPARK_DEFAULT_MASTER = "local";
+  private static final String SAPRK_DEFAULT_APP_NAME = "Hive on Spark";
 
-  private static String appName = "Hive-Spark";
-
-  private static String sparkHome = "/home/xzhang/apache/spark";
-  
-  private static int reducerCount = 1;
+  private static SparkClient client;
   
-  private static String execMem = "1g";
-  private static String execJvmOpts = "";
-
-  static {
-    String envSparkHome = System.getenv("SPARK_HOME");
-    if (envSparkHome != null) {
-      sparkHome = envSparkHome;
-    }
-
-    String envMaster = System.getenv("MASTER");
-    if (envMaster != null) {
-      masterUrl = envMaster;
-    }
-
-    String reducers = System.getenv("REDUCERS");
-    if (reducers != null) {
-      reducerCount = Integer.valueOf(reducers);
-    }
-
-    String mem = System.getenv("spark_executor_memory");
-    if (mem != null) {
-      execMem = mem;
-    }
-
-    String jopts = System.getenv("spark_executor_extraJavaOptions");
-    if (jopts != null) {
-      execJvmOpts = jopts;
+  public static synchronized SparkClient getInstance(Configuration hiveConf) {
+    if (client == null) {
+      client = new SparkClient(hiveConf);
     }
-
-  }
-  
-  private static SparkClient client = new SparkClient();
-  
-  public static SparkClient getInstance() {
     return client;
   }
   
@@ -105,13 +74,60 @@ public class SparkClient implements Seri
 
   private List<String> localFiles = new ArrayList<String>();
 
-  private SparkClient() {
-    SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(masterUrl).setSparkHome(sparkHome);
-    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
-    sparkConf.set("spark.default.parallelism", "1");
-    sparkConf.set("spark.executor.memory", execMem);
-    sparkConf.set("spark.executor.extraJavaOptions", execJvmOpts);
-    sc = new JavaSparkContext(sparkConf);
+  private SparkClient(Configuration hiveConf) {
+    sc = new JavaSparkContext(initiateSparkConf(hiveConf));
+  }
+
+  private SparkConf initiateSparkConf(Configuration hiveConf) {
+    SparkConf sparkConf = new SparkConf();
+
+    // set default spark configurations.
+    sparkConf.set("spark.master", SPARK_DEFAULT_MASTER);
+    sparkConf.set("spark.app.name", SAPRK_DEFAULT_APP_NAME);
+
+    // load properties from spark-defaults.conf.
+    InputStream inputStream = null;
+    try {
+      inputStream = this.getClass().getClassLoader().getResourceAsStream(SPARK_DEFAULT_CONF_FILE);
+      if (inputStream != null) {
+        LOG.info("loading spark properties from:" + SPARK_DEFAULT_CONF_FILE);
+        Properties properties = new Properties();
+        properties.load(inputStream);
+        for (String propertyName : properties.stringPropertyNames()) {
+          if (propertyName.startsWith("spark")) {
+            String value = properties.getProperty(propertyName);
+            sparkConf.set(propertyName, properties.getProperty(propertyName));
+            LOG.info(String.format("load spark configuration from %s (%s -> %s).",
+              SPARK_DEFAULT_CONF_FILE, propertyName, value));
+          }
+        }
+      }
+    } catch (IOException e) {
+      LOG.info("Failed to open spark configuration file:" + SPARK_DEFAULT_CONF_FILE, e);
+    } finally {
+      if (inputStream != null) {
+        try {
+          inputStream.close();
+        } catch (IOException e) {
+          LOG.debug("Failed to close inputstream.", e);
+        }
+      }
+    }
+
+    // load properties from hive configurations.
+    Iterator<Map.Entry<String, String>> iterator = hiveConf.iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<String, String> entry = iterator.next();
+      String propertyName = entry.getKey();
+      if (propertyName.startsWith("spark")) {
+        String value = entry.getValue();
+        sparkConf.set(propertyName, value);
+        LOG.info(String.format("load spark configuration from hive configuration (%s ->
%s).",
+          propertyName, value));
+      }
+    }
+
+    return sparkConf;
   }
 
   public int execute(DriverContext driverContext, SparkWork sparkWork) {
@@ -191,7 +207,7 @@ public class SparkClient implements Seri
         }
       }
     } else {
-      JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(reducerCount/*redWork.getNumReduceTasks()*/));
// Two partitions.
+      JavaPairRDD rdd3 = rdd2.partitionBy(new HashPartitioner(1/*redWork.getNumReduceTasks()*/));
// Two partitions.
       HiveReduceFunction rf = new HiveReduceFunction(confBytes);
       JavaPairRDD rdd4 = rdd3.mapPartitionsToPair(rf);
       rdd4.foreach(HiveVoidFunction.getInstance());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java?rev=1612643&r1=1612642&r2=1612643&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java Tue
Jul 22 18:42:51 2014
@@ -28,7 +28,7 @@ public class SparkTask extends Task<Spar
 
   @Override
   public int execute(DriverContext driverContext) {
-    SparkClient client = SparkClient.getInstance();
+    SparkClient client = SparkClient.getInstance(driverContext.getCtx().getConf());
     return client.execute(driverContext, getWork());
   }
 



Mime
View raw message