hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stak...@apache.org
Subject hive git commit: HIVE-18214: Flaky test: TestSparkClient (Sahil Takiar, reviewed by Peter Vary)
Date Tue, 16 Jan 2018 17:19:00 GMT
Repository: hive
Updated Branches:
  refs/heads/master 2402f3ca5 -> 87860fbca


HIVE-18214: Flaky test: TestSparkClient (Sahil Takiar, reviewed by Peter Vary)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/87860fbc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/87860fbc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/87860fbc

Branch: refs/heads/master
Commit: 87860fbca42b2477f504c8b1cefd43c865a2629c
Parents: 2402f3c
Author: Sahil Takiar <stakiar@cloudera.com>
Authored: Tue Jan 16 09:18:32 2018 -0800
Committer: Sahil Takiar <stakiar@cloudera.com>
Committed: Tue Jan 16 09:18:32 2018 -0800

----------------------------------------------------------------------
 .../hive/spark/client/SparkClientFactory.java   |   3 -
 .../hive/spark/client/SparkClientImpl.java      | 528 +++++++++----------
 .../hive/spark/client/TestSparkClient.java      |  48 +-
 3 files changed, 265 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/87860fbc/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
index 50c7bb2..8abeed8 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientFactory.java
@@ -34,9 +34,6 @@ import com.google.common.base.Throwables;
 @InterfaceAudience.Private
 public final class SparkClientFactory {
 
-  /** Used to run the driver in-process, mostly for testing. */
-  static final String CONF_KEY_IN_PROCESS = "spark.client.do_not_use.run_driver_in_process";
-
   /** Used by client and driver to share a client ID for establishing an RPC session. */
   static final String CONF_CLIENT_ID = "spark.client.authentication.client_id";
 

http://git-wip-us.apache.org/repos/asf/hive/blob/87860fbc/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index 49b7deb..eed8e53 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -214,324 +214,294 @@ class SparkClientImpl implements SparkClient {
     final String serverAddress = rpcServer.getAddress();
     final String serverPort = String.valueOf(rpcServer.getPort());
 
-    if (conf.containsKey(SparkClientFactory.CONF_KEY_IN_PROCESS)) {
-      // Mostly for testing things quickly. Do not do this in production.
-      // when invoked in-process it inherits the environment variables of the parent
-      LOG.warn("!!!! Running remote driver in-process. !!!!");
-      runnable = new Runnable() {
-        @Override
-        public void run() {
-          List<String> args = Lists.newArrayList();
-          args.add("--remote-host");
-          args.add(serverAddress);
-          args.add("--remote-port");
-          args.add(serverPort);
-          args.add("--client-id");
-          args.add(clientId);
-          args.add("--secret");
-          args.add(secret);
-
-          for (Map.Entry<String, String> e : conf.entrySet()) {
-            args.add("--conf");
-            args.add(String.format("%s=%s", e.getKey(), conf.get(e.getKey())));
-          }
-          try {
-            RemoteDriver.main(args.toArray(new String[args.size()]));
-          } catch (Exception e) {
-            LOG.error("Error running driver.", e);
-          }
-        }
-      };
-    } else {
-      // If a Spark installation is provided, use the spark-submit script. Otherwise, call
the
-      // SparkSubmit class directly, which has some caveats (like having to provide a proper
-      // version of Guava on the classpath depending on the deploy mode).
-      String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY));
-      if (sparkHome == null) {
-        sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV));
-      }
+    // If a Spark installation is provided, use the spark-submit script. Otherwise, call
the
+    // SparkSubmit class directly, which has some caveats (like having to provide a proper
+    // version of Guava on the classpath depending on the deploy mode).
+    String sparkHome = Strings.emptyToNull(conf.get(SPARK_HOME_KEY));
+    if (sparkHome == null) {
+      sparkHome = Strings.emptyToNull(System.getenv(SPARK_HOME_ENV));
+    }
+    if (sparkHome == null) {
+      sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY));
+    }
+    String sparkLogDir = conf.get("hive.spark.log.dir");
+    if (sparkLogDir == null) {
       if (sparkHome == null) {
-        sparkHome = Strings.emptyToNull(System.getProperty(SPARK_HOME_KEY));
-      }
-      String sparkLogDir = conf.get("hive.spark.log.dir");
-      if (sparkLogDir == null) {
-        if (sparkHome == null) {
-          sparkLogDir = "./target/";
-        } else {
-          sparkLogDir = sparkHome + "/logs/";
-        }
+        sparkLogDir = "./target/";
+      } else {
+        sparkLogDir = sparkHome + "/logs/";
       }
+    }
 
-      String osxTestOpts = "";
-      if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac"))
{
-        osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
-      }
+    String osxTestOpts = "";
+    if (Strings.nullToEmpty(System.getProperty("os.name")).toLowerCase().contains("mac"))
{
+      osxTestOpts = Strings.nullToEmpty(System.getenv(OSX_TEST_OPTS));
+    }
 
-      String driverJavaOpts = Joiner.on(" ").skipNulls().join(
-          "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY));
-      String executorJavaOpts = Joiner.on(" ").skipNulls().join(
-          "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY));
-
-      // Create a file with all the job properties to be read by spark-submit. Change the
-      // file's permissions so that only the owner can read it. This avoid having the
-      // connection secret show up in the child process's command line.
-      File properties = File.createTempFile("spark-submit.", ".properties");
-      if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
-        throw new IOException("Cannot change permissions of job properties file.");
-      }
-      properties.deleteOnExit();
+    String driverJavaOpts = Joiner.on(" ").skipNulls().join(
+        "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(DRIVER_OPTS_KEY));
+    String executorJavaOpts = Joiner.on(" ").skipNulls().join(
+        "-Dhive.spark.log.dir=" + sparkLogDir, osxTestOpts, conf.get(EXECUTOR_OPTS_KEY));
+
+    // Create a file with all the job properties to be read by spark-submit. Change the
+    // file's permissions so that only the owner can read it. This avoid having the
+    // connection secret show up in the child process's command line.
+    File properties = File.createTempFile("spark-submit.", ".properties");
+    if (!properties.setReadable(false) || !properties.setReadable(true, true)) {
+      throw new IOException("Cannot change permissions of job properties file.");
+    }
+    properties.deleteOnExit();
 
-      Properties allProps = new Properties();
-      // first load the defaults from spark-defaults.conf if available
-      try {
-        URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
-        if (sparkDefaultsUrl != null) {
-          LOG.info("Loading spark defaults: " + sparkDefaultsUrl);
-          allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
-        }
-      } catch (Exception e) {
-        String msg = "Exception trying to load spark-defaults.conf: " + e;
-        throw new IOException(msg, e);
-      }
-      // then load the SparkClientImpl config
-      for (Map.Entry<String, String> e : conf.entrySet()) {
-        allProps.put(e.getKey(), conf.get(e.getKey()));
+    Properties allProps = new Properties();
+    // first load the defaults from spark-defaults.conf if available
+    try {
+      URL sparkDefaultsUrl = Thread.currentThread().getContextClassLoader().getResource("spark-defaults.conf");
+      if (sparkDefaultsUrl != null) {
+        LOG.info("Loading spark defaults: " + sparkDefaultsUrl);
+        allProps.load(new ByteArrayInputStream(Resources.toByteArray(sparkDefaultsUrl)));
       }
-      allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
-      allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
-      allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
-      allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
-
-      String isTesting = conf.get("spark.testing");
-      if (isTesting != null && isTesting.equalsIgnoreCase("true")) {
-        String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
-        if (!hiveHadoopTestClasspath.isEmpty()) {
-          String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
-          if (extraDriverClasspath.isEmpty()) {
-            allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
-          } else {
-            extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath
: extraDriverClasspath + File.pathSeparator;
-            allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath);
-          }
+    } catch (Exception e) {
+      String msg = "Exception trying to load spark-defaults.conf: " + e;
+      throw new IOException(msg, e);
+    }
+    // then load the SparkClientImpl config
+    for (Map.Entry<String, String> e : conf.entrySet()) {
+      allProps.put(e.getKey(), conf.get(e.getKey()));
+    }
+    allProps.put(SparkClientFactory.CONF_CLIENT_ID, clientId);
+    allProps.put(SparkClientFactory.CONF_KEY_SECRET, secret);
+    allProps.put(DRIVER_OPTS_KEY, driverJavaOpts);
+    allProps.put(EXECUTOR_OPTS_KEY, executorJavaOpts);
+
+    String isTesting = conf.get("spark.testing");
+    if (isTesting != null && isTesting.equalsIgnoreCase("true")) {
+      String hiveHadoopTestClasspath = Strings.nullToEmpty(System.getenv("HIVE_HADOOP_TEST_CLASSPATH"));
+      if (!hiveHadoopTestClasspath.isEmpty()) {
+        String extraDriverClasspath = Strings.nullToEmpty((String)allProps.get(DRIVER_EXTRA_CLASSPATH));
+        if (extraDriverClasspath.isEmpty()) {
+          allProps.put(DRIVER_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+        } else {
+          extraDriverClasspath = extraDriverClasspath.endsWith(File.pathSeparator) ? extraDriverClasspath
: extraDriverClasspath + File.pathSeparator;
+          allProps.put(DRIVER_EXTRA_CLASSPATH, extraDriverClasspath + hiveHadoopTestClasspath);
+        }
 
-          String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH));
-          if (extraExecutorClasspath.isEmpty()) {
-            allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
-          } else {
-            extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator)
? extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator;
-            allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath);
-          }
+        String extraExecutorClasspath = Strings.nullToEmpty((String)allProps.get(EXECUTOR_EXTRA_CLASSPATH));
+        if (extraExecutorClasspath.isEmpty()) {
+          allProps.put(EXECUTOR_EXTRA_CLASSPATH, hiveHadoopTestClasspath);
+        } else {
+          extraExecutorClasspath = extraExecutorClasspath.endsWith(File.pathSeparator) ?
extraExecutorClasspath : extraExecutorClasspath + File.pathSeparator;
+          allProps.put(EXECUTOR_EXTRA_CLASSPATH, extraExecutorClasspath + hiveHadoopTestClasspath);
         }
       }
+    }
 
-      Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
-      try {
-        allProps.store(writer, "Spark Context configuration");
-      } finally {
-        writer.close();
-      }
+    Writer writer = new OutputStreamWriter(new FileOutputStream(properties), Charsets.UTF_8);
+    try {
+      allProps.store(writer, "Spark Context configuration");
+    } finally {
+      writer.close();
+    }
 
-      // Define how to pass options to the child process. If launching in client (or local)
-      // mode, the driver options need to be passed directly on the command line. Otherwise,
-      // SparkSubmit will take care of that for us.
-      String master = conf.get("spark.master");
-      Preconditions.checkArgument(master != null, "spark.master is not defined.");
-      String deployMode = conf.get("spark.submit.deployMode");
+    // Define how to pass options to the child process. If launching in client (or local)
+    // mode, the driver options need to be passed directly on the command line. Otherwise,
+    // SparkSubmit will take care of that for us.
+    String master = conf.get("spark.master");
+    Preconditions.checkArgument(master != null, "spark.master is not defined.");
+    String deployMode = conf.get("spark.submit.deployMode");
 
-      List<String> argv = Lists.newLinkedList();
+    List<String> argv = Lists.newLinkedList();
 
-      if (sparkHome != null) {
-        argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
-      } else {
-        LOG.info("No spark.home provided, calling SparkSubmit directly.");
-        argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
-
-        if (master.startsWith("local") || master.startsWith("mesos") ||
-            SparkClientUtilities.isYarnClientMode(master, deployMode) ||
-            master.startsWith("spark")) {
-          String mem = conf.get("spark.driver.memory");
-          if (mem != null) {
-            argv.add("-Xms" + mem);
-            argv.add("-Xmx" + mem);
-          }
+    if (sparkHome != null) {
+      argv.add(new File(sparkHome, "bin/spark-submit").getAbsolutePath());
+    } else {
+      LOG.info("No spark.home provided, calling SparkSubmit directly.");
+      argv.add(new File(System.getProperty("java.home"), "bin/java").getAbsolutePath());
+
+      if (master.startsWith("local") || master.startsWith("mesos") ||
+          SparkClientUtilities.isYarnClientMode(master, deployMode) ||
+          master.startsWith("spark")) {
+        String mem = conf.get("spark.driver.memory");
+        if (mem != null) {
+          argv.add("-Xms" + mem);
+          argv.add("-Xmx" + mem);
+        }
 
-          String cp = conf.get("spark.driver.extraClassPath");
-          if (cp != null) {
-            argv.add("-classpath");
-            argv.add(cp);
-          }
+        String cp = conf.get("spark.driver.extraClassPath");
+        if (cp != null) {
+          argv.add("-classpath");
+          argv.add(cp);
+        }
 
-          String libPath = conf.get("spark.driver.extraLibPath");
-          if (libPath != null) {
-            argv.add("-Djava.library.path=" + libPath);
-          }
+        String libPath = conf.get("spark.driver.extraLibPath");
+        if (libPath != null) {
+          argv.add("-Djava.library.path=" + libPath);
+        }
 
-          String extra = conf.get(DRIVER_OPTS_KEY);
-          if (extra != null) {
-            for (String opt : extra.split("[ ]")) {
-              if (!opt.trim().isEmpty()) {
-                argv.add(opt.trim());
-              }
+        String extra = conf.get(DRIVER_OPTS_KEY);
+        if (extra != null) {
+          for (String opt : extra.split("[ ]")) {
+            if (!opt.trim().isEmpty()) {
+              argv.add(opt.trim());
             }
           }
         }
-
-        argv.add("org.apache.spark.deploy.SparkSubmit");
       }
 
-      if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
-        String executorCores = conf.get("spark.executor.cores");
-        if (executorCores != null) {
-          argv.add("--executor-cores");
-          argv.add(executorCores);
-        }
+      argv.add("org.apache.spark.deploy.SparkSubmit");
+    }
 
-        String executorMemory = conf.get("spark.executor.memory");
-        if (executorMemory != null) {
-          argv.add("--executor-memory");
-          argv.add(executorMemory);
-        }
+    if (SparkClientUtilities.isYarnClusterMode(master, deployMode)) {
+      String executorCores = conf.get("spark.executor.cores");
+      if (executorCores != null) {
+        argv.add("--executor-cores");
+        argv.add(executorCores);
+      }
 
-        String numOfExecutors = conf.get("spark.executor.instances");
-        if (numOfExecutors != null) {
-          argv.add("--num-executors");
-          argv.add(numOfExecutors);
-        }
+      String executorMemory = conf.get("spark.executor.memory");
+      if (executorMemory != null) {
+        argv.add("--executor-memory");
+        argv.add(executorMemory);
       }
-      // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh
-      // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or
-      // delegation token renewal, but not both. Since doAs is a more common case, if both
-      // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command,
-      // otherwise, we pass the principal/keypad to spark to support the token renewal for
-      // long-running application.
-      if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) {
-        String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
-            "0.0.0.0");
-        String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
-        if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile))
{
-          if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
-            List<String> kinitArgv = Lists.newLinkedList();
-            kinitArgv.add("kinit");
-            kinitArgv.add(principal);
-            kinitArgv.add("-k");
-            kinitArgv.add("-t");
-            kinitArgv.add(keyTabFile + ";");
-            kinitArgv.addAll(argv);
-            argv = kinitArgv;
-          } else {
-            // if doAs is not enabled, we pass the principal/keypad to spark-submit in order
to
-            // support the possible delegation token renewal in Spark
-            argv.add("--principal");
-            argv.add(principal);
-            argv.add("--keytab");
-            argv.add(keyTabFile);
-          }
+
+      String numOfExecutors = conf.get("spark.executor.instances");
+      if (numOfExecutors != null) {
+        argv.add("--num-executors");
+        argv.add(numOfExecutors);
+      }
+    }
+    // The options --principal/--keypad do not work with --proxy-user in spark-submit.sh
+    // (see HIVE-15485, SPARK-5493, SPARK-19143), so Hive could only support doAs or
+    // delegation token renewal, but not both. Since doAs is a more common case, if both
+    // are needed, we choose to favor doAs. So when doAs is enabled, we use kinit command,
+    // otherwise, we pass the principal/keypad to spark to support the token renewal for
+    // long-running application.
+    if ("kerberos".equals(hiveConf.get(HADOOP_SECURITY_AUTHENTICATION))) {
+      String principal = SecurityUtil.getServerPrincipal(hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
+          "0.0.0.0");
+      String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
+      if (StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(keyTabFile))
{
+        if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+          List<String> kinitArgv = Lists.newLinkedList();
+          kinitArgv.add("kinit");
+          kinitArgv.add(principal);
+          kinitArgv.add("-k");
+          kinitArgv.add("-t");
+          kinitArgv.add(keyTabFile + ";");
+          kinitArgv.addAll(argv);
+          argv = kinitArgv;
+        } else {
+          // if doAs is not enabled, we pass the principal/keypad to spark-submit in order
to
+          // support the possible delegation token renewal in Spark
+          argv.add("--principal");
+          argv.add(principal);
+          argv.add("--keytab");
+          argv.add(keyTabFile);
         }
       }
-      if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
-        try {
-          String currentUser = Utils.getUGI().getShortUserName();
-          // do not do impersonation in CLI mode
-          if (!currentUser.equals(System.getProperty("user.name"))) {
-            LOG.info("Attempting impersonation of " + currentUser);
-            argv.add("--proxy-user");
-            argv.add(currentUser);
-          }
-        } catch (Exception e) {
-          String msg = "Cannot obtain username: " + e;
-          throw new IllegalStateException(msg, e);
+    }
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
+      try {
+        String currentUser = Utils.getUGI().getShortUserName();
+        // do not do impersonation in CLI mode
+        if (!currentUser.equals(System.getProperty("user.name"))) {
+          LOG.info("Attempting impersonation of " + currentUser);
+          argv.add("--proxy-user");
+          argv.add(currentUser);
         }
+      } catch (Exception e) {
+        String msg = "Cannot obtain username: " + e;
+        throw new IllegalStateException(msg, e);
       }
+    }
 
-      String regStr = conf.get("spark.kryo.registrator");
-      if (HIVE_KRYO_REG_NAME.equals(regStr)) {
-        argv.add("--jars");
-        argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf));
-      }
+    String regStr = conf.get("spark.kryo.registrator");
+    if (HIVE_KRYO_REG_NAME.equals(regStr)) {
+      argv.add("--jars");
+      argv.add(SparkClientUtilities.findKryoRegistratorJar(hiveConf));
+    }
 
-      argv.add("--properties-file");
-      argv.add(properties.getAbsolutePath());
-      argv.add("--class");
-      argv.add(RemoteDriver.class.getName());
+    argv.add("--properties-file");
+    argv.add(properties.getAbsolutePath());
+    argv.add("--class");
+    argv.add(RemoteDriver.class.getName());
 
-      String jar = "spark-internal";
-      if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
-        jar = SparkContext.jarOfClass(this.getClass()).get();
-      }
-      argv.add(jar);
-
-      argv.add("--remote-host");
-      argv.add(serverAddress);
-      argv.add("--remote-port");
-      argv.add(serverPort);
-
-      //hive.spark.* keys are passed down to the RemoteDriver via --conf,
-      //as --properties-file contains the spark.* keys that are meant for SparkConf object.
-      for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
-        String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
-        argv.add("--conf");
-        argv.add(String.format("%s=%s", hiveSparkConfKey, value));
-      }
+    String jar = "spark-internal";
+    if (SparkContext.jarOfClass(this.getClass()).isDefined()) {
+      jar = SparkContext.jarOfClass(this.getClass()).get();
+    }
+    argv.add(jar);
+
+    argv.add("--remote-host");
+    argv.add(serverAddress);
+    argv.add("--remote-port");
+    argv.add(serverPort);
+
+    //hive.spark.* keys are passed down to the RemoteDriver via --conf,
+    //as --properties-file contains the spark.* keys that are meant for SparkConf object.
+    for (String hiveSparkConfKey : RpcConfiguration.HIVE_SPARK_RSC_CONFIGS) {
+      String value = RpcConfiguration.getValue(hiveConf, hiveSparkConfKey);
+      argv.add("--conf");
+      argv.add(String.format("%s=%s", hiveSparkConfKey, value));
+    }
 
-      String cmd = Joiner.on(" ").join(argv);
-      LOG.info("Running client driver with argv: {}", cmd);
-      ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
-
-      // Prevent hive configurations from being visible in Spark.
-      pb.environment().remove("HIVE_HOME");
-      pb.environment().remove("HIVE_CONF_DIR");
-      // Add credential provider password to the child process's environment
-      // In case of Spark the credential provider location is provided in the jobConf when
the job is submitted
-      String password = getSparkJobCredentialProviderPassword();
-      if(password != null) {
-        pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password);
-      }
-      if (isTesting != null) {
-        pb.environment().put("SPARK_TESTING", isTesting);
-      }
+    String cmd = Joiner.on(" ").join(argv);
+    LOG.info("Running client driver with argv: {}", cmd);
+    ProcessBuilder pb = new ProcessBuilder("sh", "-c", cmd);
+
+    // Prevent hive configurations from being visible in Spark.
+    pb.environment().remove("HIVE_HOME");
+    pb.environment().remove("HIVE_CONF_DIR");
+    // Add credential provider password to the child process's environment
+    // In case of Spark the credential provider location is provided in the jobConf when
the job is submitted
+    String password = getSparkJobCredentialProviderPassword();
+    if(password != null) {
+      pb.environment().put(Constants.HADOOP_CREDENTIAL_PASSWORD_ENVVAR, password);
+    }
+    if (isTesting != null) {
+      pb.environment().put("SPARK_TESTING", isTesting);
+    }
 
-      final Process child = pb.start();
-      String threadName = Thread.currentThread().getName();
-      final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
-      final LogRedirector.LogSourceCallback callback = () -> {return isAlive;};
+    final Process child = pb.start();
+    String threadName = Thread.currentThread().getName();
+    final List<String> childErrorLog = Collections.synchronizedList(new ArrayList<String>());
+    final LogRedirector.LogSourceCallback callback = () -> {return isAlive;};
 
-      LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName,
-          new LogRedirector(child.getInputStream(), LOG, callback));
-      LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName,
-          new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
+    LogRedirector.redirect("RemoteDriver-stdout-redir-" + threadName,
+        new LogRedirector(child.getInputStream(), LOG, callback));
+    LogRedirector.redirect("RemoteDriver-stderr-redir-" + threadName,
+        new LogRedirector(child.getErrorStream(), LOG, childErrorLog, callback));
 
-      runnable = new Runnable() {
-        @Override
-        public void run() {
-          try {
-            int exitCode = child.waitFor();
-            if (exitCode != 0) {
-              StringBuilder errStr = new StringBuilder();
-              synchronized(childErrorLog) {
-                Iterator iter = childErrorLog.iterator();
-                while(iter.hasNext()){
-                  errStr.append(iter.next());
-                  errStr.append('\n');
-                }
+    runnable = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          int exitCode = child.waitFor();
+          if (exitCode != 0) {
+            StringBuilder errStr = new StringBuilder();
+            synchronized(childErrorLog) {
+              Iterator iter = childErrorLog.iterator();
+              while(iter.hasNext()){
+                errStr.append(iter.next());
+                errStr.append('\n');
               }
-
-              LOG.warn("Child process exited with code {}", exitCode);
-              rpcServer.cancelClient(clientId,
-                  "Child process (spark-submit) exited before connecting back with error
log " + errStr.toString());
             }
-          } catch (InterruptedException ie) {
-            LOG.warn("Thread waiting on the child process (spark-submit) is interrupted,
killing the child process.");
-            rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit)
is interrupted");
-            Thread.interrupted();
-            child.destroy();
-          } catch (Exception e) {
-            String errMsg = "Exception while waiting for child process (spark-submit)";
-            LOG.warn(errMsg, e);
-            rpcServer.cancelClient(clientId, errMsg);
+
+            LOG.warn("Child process exited with code {}", exitCode);
+            rpcServer.cancelClient(clientId,
+                "Child process (spark-submit) exited before connecting back with error log
" + errStr.toString());
           }
+        } catch (InterruptedException ie) {
+          LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing
the child process.");
+          rpcServer.cancelClient(clientId, "Thread waiting on the child porcess (spark-submit)
is interrupted");
+          Thread.interrupted();
+          child.destroy();
+        } catch (Exception e) {
+          String errMsg = "Exception while waiting for child process (spark-submit)";
+          LOG.warn(errMsg, e);
+          rpcServer.cancelClient(clientId, errMsg);
         }
-      };
-    }
+      }
+    };
 
     Thread thread = new Thread(runnable);
     thread.setDaemon(true);

http://git-wip-us.apache.org/repos/asf/hive/blob/87860fbc/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
index 697d8d1..23df792 100644
--- a/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
+++ b/spark-client/src/test/java/org/apache/hive/spark/client/TestSparkClient.java
@@ -68,19 +68,14 @@ public class TestSparkClient {
   private static final long TIMEOUT = 20;
   private static final HiveConf HIVECONF = new HiveConf();
 
-  private Map<String, String> createConf(boolean local) {
+  private Map<String, String> createConf() {
     Map<String, String> conf = new HashMap<String, String>();
-    if (local) {
-      conf.put(SparkClientFactory.CONF_KEY_IN_PROCESS, "true");
-      conf.put("spark.master", "local");
-      conf.put("spark.app.name", "SparkClientSuite Local App");
-    } else {
-      String classpath = System.getProperty("java.class.path");
-      conf.put("spark.master", "local");
-      conf.put("spark.app.name", "SparkClientSuite Remote App");
-      conf.put("spark.driver.extraClassPath", classpath);
-      conf.put("spark.executor.extraClassPath", classpath);
-    }
+
+    String classpath = System.getProperty("java.class.path");
+    conf.put("spark.master", "local");
+    conf.put("spark.app.name", "SparkClientSuite Remote App");
+    conf.put("spark.driver.extraClassPath", classpath);
+    conf.put("spark.executor.extraClassPath", classpath);
 
     if (!Strings.isNullOrEmpty(System.getProperty("spark.home"))) {
       conf.put("spark.home", System.getProperty("spark.home"));
@@ -91,7 +86,7 @@ public class TestSparkClient {
 
   @Test
   public void testJobSubmission() throws Exception {
-    runTest(true, new TestFunction() {
+    runTest(new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
         JobHandle.Listener<String> listener = newListener();
@@ -112,7 +107,7 @@ public class TestSparkClient {
 
   @Test
   public void testSimpleSparkJob() throws Exception {
-    runTest(true, new TestFunction() {
+    runTest(new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
         JobHandle<Long> handle = client.submit(new SparkJob());
@@ -123,7 +118,7 @@ public class TestSparkClient {
 
   @Test
   public void testErrorJob() throws Exception {
-    runTest(true, new TestFunction() {
+    runTest(new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
         JobHandle.Listener<String> listener = newListener();
@@ -151,7 +146,7 @@ public class TestSparkClient {
 
   @Test
   public void testSyncRpc() throws Exception {
-    runTest(true, new TestFunction() {
+    runTest(new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
         Future<String> result = client.run(new SyncRpc());
@@ -161,19 +156,8 @@ public class TestSparkClient {
   }
 
   @Test
-  public void testRemoteClient() throws Exception {
-    runTest(false, new TestFunction() {
-      @Override
-      public void call(SparkClient client) throws Exception {
-        JobHandle<Long> handle = client.submit(new SparkJob());
-        assertEquals(Long.valueOf(5L), handle.get(TIMEOUT, TimeUnit.SECONDS));
-      }
-    });
-  }
-
-  @Test
   public void testMetricsCollection() throws Exception {
-    runTest(true, new TestFunction() {
+    runTest(new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
         JobHandle.Listener<Integer> listener = newListener();
@@ -202,7 +186,7 @@ public class TestSparkClient {
 
   @Test
   public void testAddJarsAndFiles() throws Exception {
-    runTest(true, new TestFunction() {
+    runTest(new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
         File jar = null;
@@ -256,7 +240,7 @@ public class TestSparkClient {
 
   @Test
   public void testCounters() throws Exception {
-    runTest(true, new TestFunction() {
+    runTest(new TestFunction() {
       @Override
       public void call(SparkClient client) throws Exception {
         JobHandle<?> job = client.submit(new CounterIncrementJob());
@@ -308,8 +292,8 @@ public class TestSparkClient {
     }).when(listener);
   }
 
-  private void runTest(boolean local, TestFunction test) throws Exception {
-    Map<String, String> conf = createConf(local);
+  private void runTest(TestFunction test) throws Exception {
+    Map<String, String> conf = createConf();
     SparkClientFactory.initialize(conf);
     SparkClient client = null;
     try {


Mime
View raw message