spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject [2/3] spark git commit: [SPARK-4924] Add a library for launching Spark jobs programmatically.
Date Wed, 11 Mar 2015 08:03:04 GMT
http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/pom.xml
----------------------------------------------------------------------
diff --git a/launcher/pom.xml b/launcher/pom.xml
new file mode 100644
index 0000000..ccbd9d0
--- /dev/null
+++ b/launcher/pom.xml
@@ -0,0 +1,83 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.10</artifactId>
+    <version>1.3.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <groupId>org.apache.spark</groupId>
+  <artifactId>spark-launcher_2.10</artifactId>
+  <packaging>jar</packaging>
+  <name>Spark Launcher Project</name>
+  <url>http://spark.apache.org/</url>
+  <properties>
+    <sbt.project.name>launcher</sbt.project.name>
+  </properties>
+
+  <dependencies>
+    <!-- NOTE: only test-scope dependencies are allowed in this module. -->
+    <dependency>
+      <groupId>log4j</groupId>
+      <artifactId>log4j</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <!-- Not needed by the test code, but referenced by SparkSubmit which is used by the tests. -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+    <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
new file mode 100644
index 0000000..dc90e9e
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.jar.JarFile;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Abstract Spark command builder that defines common functionality.
+ */
+abstract class AbstractCommandBuilder {
+
+  boolean verbose;
+  String appName;
+  String appResource;
+  String deployMode;
+  String javaHome;
+  String mainClass;
+  String master;
+  String propertiesFile;
+  final List<String> appArgs;
+  final List<String> jars;
+  final List<String> files;
+  final List<String> pyFiles;
+  final Map<String, String> childEnv;
+  final Map<String, String> conf;
+
+  public AbstractCommandBuilder() {
+    this.appArgs = new ArrayList<String>();
+    this.childEnv = new HashMap<String, String>();
+    this.conf = new HashMap<String, String>();
+    this.files = new ArrayList<String>();
+    this.jars = new ArrayList<String>();
+    this.pyFiles = new ArrayList<String>();
+  }
+
+  /**
+   * Builds the command to execute.
+   *
+   * @param env A map containing environment variables for the child process. It may already contain
+   *            entries defined by the user (such as SPARK_HOME, or those defined by the
+   *            SparkLauncher constructor that takes an environment), and may be modified to
+   *            include other variables needed by the process to be executed.
+   */
+  abstract List<String> buildCommand(Map<String, String> env) throws IOException;
+
+  /**
+   * Builds a list of arguments to run java.
+   *
+   * This method finds the java executable to use and appends JVM-specific options for running a
+   * class with Spark in the classpath. It also loads options from the "java-opts" file in the
+   * configuration directory being used.
+   *
+   * Callers should still add at least the class to run, as well as any arguments to pass to the
+   * class.
+   */
+  List<String> buildJavaCommand(String extraClassPath) throws IOException {
+    List<String> cmd = new ArrayList<String>();
+    if (javaHome == null) {
+      cmd.add(join(File.separator, System.getProperty("java.home"), "bin", "java"));
+    } else {
+      cmd.add(join(File.separator, javaHome, "bin", "java"));
+    }
+
+    // Load extra JAVA_OPTS from conf/java-opts, if it exists.
+    File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
+    if (javaOpts.isFile()) {
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+          new FileInputStream(javaOpts), "UTF-8"));
+      try {
+        String line;
+        while ((line = br.readLine()) != null) {
+          addOptionString(cmd, line);
+        }
+      } finally {
+        br.close();
+      }
+    }
+
+    cmd.add("-cp");
+    cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
+    return cmd;
+  }
+
+  /**
+   * Adds the default perm gen size option for Spark if the VM requires it and the user hasn't
+   * set it.
+   */
+  void addPermGenSizeOpt(List<String> cmd) {
+    // Don't set MaxPermSize for Java 8 and later.
+    String[] version = System.getProperty("java.version").split("\\.");
+    if (Integer.parseInt(version[0]) > 1 || Integer.parseInt(version[1]) > 7) {
+      return;
+    }
+
+    for (String arg : cmd) {
+      if (arg.startsWith("-XX:MaxPermSize=")) {
+        return;
+      }
+    }
+
+    cmd.add("-XX:MaxPermSize=128m");
+  }
+
+  void addOptionString(List<String> cmd, String options) {
+    if (!isEmpty(options)) {
+      for (String opt : parseOptionString(options)) {
+        cmd.add(opt);
+      }
+    }
+  }
+
+  /**
+   * Builds the classpath for the application. Returns a list with one classpath entry per element;
+   * each entry is formatted in the way expected by <i>java.net.URLClassLoader</i> (more
+   * specifically, with trailing slashes for directories).
+   */
+  List<String> buildClassPath(String appClassPath) throws IOException {
+    String sparkHome = getSparkHome();
+    String scala = getScalaVersion();
+
+    List<String> cp = new ArrayList<String>();
+    addToClassPath(cp, getenv("SPARK_CLASSPATH"));
+    addToClassPath(cp, appClassPath);
+
+    addToClassPath(cp, getConfDir());
+
+    boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES"));
+    boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
+    if (prependClasses || isTesting) {
+      List<String> projects = Arrays.asList("core", "repl", "mllib", "bagel", "graphx",
+        "streaming", "tools", "sql/catalyst", "sql/core", "sql/hive", "sql/hive-thriftserver",
+        "yarn", "launcher");
+      if (prependClasses) {
+        System.err.println(
+          "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
+          "assembly.");
+        for (String project : projects) {
+          addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project,
+            scala));
+        }
+      }
+      if (isTesting) {
+        for (String project : projects) {
+          addToClassPath(cp, String.format("%s/%s/target/scala-%s/test-classes", sparkHome,
+            project, scala));
+        }
+      }
+
+      // Add this path to include jars that are shaded in the final deliverable created during
+      // the maven build. These jars are copied to this directory during the build.
+      addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
+    }
+
+    String assembly = findAssembly(scala);
+    addToClassPath(cp, assembly);
+
+    // When Hive support is needed, Datanucleus jars must be included on the classpath. Datanucleus
+    // jars do not work if only included in the uber jar as plugin.xml metadata is lost. Both sbt
+    // and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is built
+    // with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
+    // assembly is built for Hive, before actually populating the CLASSPATH with the jars.
+    //
+    // This block also serves as a check for SPARK-1703, when the assembly jar is built with
+    // Java 7 and ends up with too many files, causing issues with other JDK versions.
+    boolean needsDataNucleus = false;
+    JarFile assemblyJar = null;
+    try {
+      assemblyJar = new JarFile(assembly);
+      needsDataNucleus = assemblyJar.getEntry("org/apache/hadoop/hive/ql/exec/") != null;
+    } catch (IOException ioe) {
+      if (ioe.getMessage().indexOf("invalid CEN header") >= 0) {
+        System.err.println(
+          "Loading Spark jar failed.\n" +
+          "This is likely because Spark was compiled with Java 7 and run\n" +
+          "with Java 6 (see SPARK-1703). Please use Java 7 to run Spark\n" +
+          "or build Spark with Java 6.");
+        System.exit(1);
+      } else {
+        throw ioe;
+      }
+    } finally {
+      if (assemblyJar != null) {
+        try {
+          assemblyJar.close();
+        } catch (IOException e) {
+          // Ignore.
+        }
+      }
+    }
+
+    if (needsDataNucleus) {
+      System.err.println("Spark assembly has been built with Hive, including Datanucleus jars " +
+        "in classpath.");
+      File libdir;
+      if (new File(sparkHome, "RELEASE").isFile()) {
+        libdir = new File(sparkHome, "lib");
+      } else {
+        libdir = new File(sparkHome, "lib_managed/jars");
+      }
+
+      checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
+        libdir.getAbsolutePath());
+      for (File jar : libdir.listFiles()) {
+        if (jar.getName().startsWith("datanucleus-")) {
+          addToClassPath(cp, jar.getAbsolutePath());
+        }
+      }
+    }
+
+    addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
+    addToClassPath(cp, getenv("YARN_CONF_DIR"));
+    addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH"));
+    return cp;
+  }
+
+  /**
+   * Adds entries to the classpath.
+   *
+   * @param cp List to which the new entries are appended.
+   * @param entries New classpath entries (separated by File.pathSeparator).
+   */
+  private void addToClassPath(List<String> cp, String entries) {
+    if (isEmpty(entries)) {
+      return;
+    }
+    String[] split = entries.split(Pattern.quote(File.pathSeparator));
+    for (String entry : split) {
+      if (!isEmpty(entry)) {
+        if (new File(entry).isDirectory() && !entry.endsWith(File.separator)) {
+          entry += File.separator;
+        }
+        cp.add(entry);
+      }
+    }
+  }
+
+  String getScalaVersion() {
+    String scala = getenv("SPARK_SCALA_VERSION");
+    if (scala != null) {
+      return scala;
+    }
+
+    String sparkHome = getSparkHome();
+    File scala210 = new File(sparkHome, "assembly/target/scala-2.10");
+    File scala211 = new File(sparkHome, "assembly/target/scala-2.11");
+    checkState(!scala210.isDirectory() || !scala211.isDirectory(),
+      "Presence of build for both scala versions (2.10 and 2.11) detected.\n" +
+      "Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
+    if (scala210.isDirectory()) {
+      return "2.10";
+    } else {
+      checkState(scala211.isDirectory(), "Cannot find any assembly build directories.");
+      return "2.11";
+    }
+  }
+
+  String getSparkHome() {
+    String path = getenv(ENV_SPARK_HOME);
+    checkState(path != null,
+      "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
+    return path;
+  }
+
+  /**
+   * Loads the configuration file for the application, if it exists. This is either the
+   * user-specified properties file, or the spark-defaults.conf file under the Spark configuration
+   * directory.
+   */
+  Properties loadPropertiesFile() throws IOException {
+    Properties props = new Properties();
+    File propsFile;
+    if (propertiesFile != null) {
+      propsFile = new File(propertiesFile);
+      checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile);
+    } else {
+      propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE);
+    }
+
+    if (propsFile.isFile()) {
+      FileInputStream fd = null;
+      try {
+        fd = new FileInputStream(propsFile);
+        props.load(new InputStreamReader(fd, "UTF-8"));
+      } finally {
+        if (fd != null) {
+          try {
+            fd.close();
+          } catch (IOException e) {
+            // Ignore.
+          }
+        }
+      }
+    }
+
+    return props;
+  }
+
+  String getenv(String key) {
+    return firstNonEmpty(childEnv.get(key), System.getenv(key));
+  }
+
+  private String findAssembly(String scalaVersion) {
+    String sparkHome = getSparkHome();
+    File libdir;
+    if (new File(sparkHome, "RELEASE").isFile()) {
+      libdir = new File(sparkHome, "lib");
+      checkState(libdir.isDirectory(), "Library directory '%s' does not exist.",
+          libdir.getAbsolutePath());
+    } else {
+      libdir = new File(sparkHome, String.format("assembly/target/scala-%s", scalaVersion));
+    }
+
+    final Pattern re = Pattern.compile("spark-assembly.*hadoop.*\\.jar");
+    FileFilter filter = new FileFilter() {
+      @Override
+      public boolean accept(File file) {
+        return file.isFile() && re.matcher(file.getName()).matches();
+      }
+    };
+    File[] assemblies = libdir.listFiles(filter);
+    checkState(assemblies != null && assemblies.length > 0, "No assemblies found in '%s'.", libdir);
+    checkState(assemblies.length == 1, "Multiple assemblies found in '%s'.", libdir);
+    return assemblies[0].getAbsolutePath();
+  }
+
+  private String getConfDir() {
+    String confDir = getenv("SPARK_CONF_DIR");
+    return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
new file mode 100644
index 0000000..9b04732
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/CommandBuilderUtils.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Helper methods for command builders.
+ */
+class CommandBuilderUtils {
+
+  static final String DEFAULT_MEM = "512m";
+  static final String DEFAULT_PROPERTIES_FILE = "spark-defaults.conf";
+  static final String ENV_SPARK_HOME = "SPARK_HOME";
+
+  /** Returns whether the given string is null or empty. */
+  static boolean isEmpty(String s) {
+    return s == null || s.isEmpty();
+  }
+
+  /** Joins a list of strings using the given separator. */
+  static String join(String sep, String... elements) {
+    StringBuilder sb = new StringBuilder();
+    for (String e : elements) {
+      if (e != null) {
+        if (sb.length() > 0) {
+          sb.append(sep);
+        }
+        sb.append(e);
+      }
+    }
+    return sb.toString();
+  }
+
+  /** Joins a list of strings using the given separator. */
+  static String join(String sep, Iterable<String> elements) {
+    StringBuilder sb = new StringBuilder();
+    for (String e : elements) {
+      if (e != null) {
+        if (sb.length() > 0) {
+          sb.append(sep);
+        }
+        sb.append(e);
+      }
+    }
+    return sb.toString();
+  }
+
+  /**
+   * Returns the first non-empty value mapped to the given key in the given maps, or null otherwise.
+   */
+  static String firstNonEmptyValue(String key, Map<?, ?>... maps) {
+    for (Map<?, ?> map : maps) {
+      String value = (String) map.get(key);
+      if (!isEmpty(value)) {
+        return value;
+      }
+    }
+    return null;
+  }
+
+  /** Returns the first non-empty, non-null string in the given list, or null otherwise. */
+  static String firstNonEmpty(String... candidates) {
+    for (String s : candidates) {
+      if (!isEmpty(s)) {
+        return s;
+      }
+    }
+    return null;
+  }
+
+  /** Returns the name of the env variable that holds the native library path. */
+  static String getLibPathEnvName() {
+    if (isWindows()) {
+      return "PATH";
+    }
+
+    String os = System.getProperty("os.name");
+    if (os.startsWith("Mac OS X")) {
+      return "DYLD_LIBRARY_PATH";
+    } else {
+      return "LD_LIBRARY_PATH";
+    }
+  }
+
+  /** Returns whether the OS is Windows. */
+  static boolean isWindows() {
+    String os = System.getProperty("os.name");
+    return os.startsWith("Windows");
+  }
+
+  /**
+   * Updates the user environment, appending the given pathList to the existing value of the given
+   * environment variable (or setting it if it hasn't yet been set).
+   */
+  static void mergeEnvPathList(Map<String, String> userEnv, String envKey, String pathList) {
+    if (!isEmpty(pathList)) {
+      String current = firstNonEmpty(userEnv.get(envKey), System.getenv(envKey));
+      userEnv.put(envKey, join(File.pathSeparator, current, pathList));
+    }
+  }
+
+  /**
+   * Parse a string as if it were a list of arguments, following bash semantics.
+   * For example:
+   *
+   * Input: "\"ab cd\" efgh 'i \" j'"
+   * Output: [ "ab cd", "efgh", "i \" j" ]
+   */
+  static List<String> parseOptionString(String s) {
+    List<String> opts = new ArrayList<String>();
+    StringBuilder opt = new StringBuilder();
+    boolean inOpt = false;
+    boolean inSingleQuote = false;
+    boolean inDoubleQuote = false;
+    boolean escapeNext = false;
+
+    // This is needed to detect when a quoted empty string is used as an argument ("" or '').
+    boolean hasData = false;
+
+    for (int i = 0; i < s.length(); i++) {
+      int c = s.codePointAt(i);
+      if (escapeNext) {
+        opt.appendCodePoint(c);
+        escapeNext = false;
+      } else if (inOpt) {
+        switch (c) {
+        case '\\':
+          if (inSingleQuote) {
+            opt.appendCodePoint(c);
+          } else {
+            escapeNext = true;
+          }
+          break;
+        case '\'':
+          if (inDoubleQuote) {
+            opt.appendCodePoint(c);
+          } else {
+            inSingleQuote = !inSingleQuote;
+          }
+          break;
+        case '"':
+          if (inSingleQuote) {
+            opt.appendCodePoint(c);
+          } else {
+            inDoubleQuote = !inDoubleQuote;
+          }
+          break;
+        default:
+          if (!Character.isWhitespace(c) || inSingleQuote || inDoubleQuote) {
+            opt.appendCodePoint(c);
+          } else {
+            opts.add(opt.toString());
+            opt.setLength(0);
+            inOpt = false;
+            hasData = false;
+          }
+        }
+      } else {
+        switch (c) {
+        case '\'':
+          inSingleQuote = true;
+          inOpt = true;
+          hasData = true;
+          break;
+        case '"':
+          inDoubleQuote = true;
+          inOpt = true;
+          hasData = true;
+          break;
+        case '\\':
+          escapeNext = true;
+          inOpt = true;
+          hasData = true;
+          break;
+        default:
+          if (!Character.isWhitespace(c)) {
+            inOpt = true;
+            hasData = true;
+            opt.appendCodePoint(c);
+          }
+        }
+      }
+    }
+
+    checkArgument(!inSingleQuote && !inDoubleQuote && !escapeNext, "Invalid option string: %s", s);
+    if (hasData) {
+      opts.add(opt.toString());
+    }
+    return opts;
+  }
+
+  /** Throws IllegalArgumentException if the given object is null. */
+  static void checkNotNull(Object o, String arg) {
+    if (o == null) {
+      throw new IllegalArgumentException(String.format("'%s' must not be null.", arg));
+    }
+  }
+
+  /** Throws IllegalArgumentException with the given message if the check is false. */
+  static void checkArgument(boolean check, String msg, Object... args) {
+    if (!check) {
+      throw new IllegalArgumentException(String.format(msg, args));
+    }
+  }
+
+  /** Throws IllegalStateException with the given message if the check is false. */
+  static void checkState(boolean check, String msg, Object... args) {
+    if (!check) {
+      throw new IllegalStateException(String.format(msg, args));
+    }
+  }
+
+  /**
+   * Quote a command argument for a command to be run by a Windows batch script, if the argument
+   * needs quoting. Arguments only seem to need quotes in batch scripts if they have certain
+   * special characters, some of which need extra (and different) escaping.
+   *
+   *  For example:
+   *    original single argument: ab="cde fgh"
+   *    quoted: "ab^=""cde fgh"""
+   */
+  static String quoteForBatchScript(String arg) {
+
+    boolean needsQuotes = false;
+    for (int i = 0; i < arg.length(); i++) {
+      int c = arg.codePointAt(i);
+      if (Character.isWhitespace(c) || c == '"' || c == '=') {
+        needsQuotes = true;
+        break;
+      }
+    }
+    if (!needsQuotes) {
+      return arg;
+    }
+    StringBuilder quoted = new StringBuilder();
+    quoted.append("\"");
+    for (int i = 0; i < arg.length(); i++) {
+      int cp = arg.codePointAt(i);
+      switch (cp) {
+      case '"':
+        quoted.append('"');
+        break;
+
+      case '=':
+        quoted.append('^');
+        break;
+
+      default:
+        break;
+      }
+      quoted.appendCodePoint(cp);
+    }
+    quoted.append("\"");
+    return quoted.toString();
+  }
+
+  /**
+   * Quotes a string so that it can be used in a command string and be parsed back into a single
+   * argument by python's "shlex.split()" function.
+   *
+   * Basically, just add simple escapes. E.g.:
+   *    original single argument : ab "cd" ef
+   *    after: "ab \"cd\" ef"
+   */
+  static String quoteForPython(String s) {
+    StringBuilder quoted = new StringBuilder().append('"');
+    for (int i = 0; i < s.length(); i++) {
+      int cp = s.codePointAt(i);
+      if (cp == '"' || cp == '\\') {
+        quoted.appendCodePoint('\\');
+      }
+      quoted.appendCodePoint(cp);
+    }
+    return quoted.append('"').toString();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/main/java/org/apache/spark/launcher/Main.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/Main.java b/launcher/src/main/java/org/apache/spark/launcher/Main.java
new file mode 100644
index 0000000..206acfb
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/Main.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Command line interface for the Spark launcher. Used internally by Spark scripts.
+ */
+class Main {
+
+  /**
+   * Usage: Main [class] [class args]
+   * <p/>
+   * This CLI works in two different modes:
+   * <ul>
+   *   <li>"spark-submit": if <i>class</i> is "org.apache.spark.deploy.SparkSubmit", the
+   *   {@link SparkLauncher} class is used to launch a Spark application.</li>
+   *   <li>"spark-class": if another class is provided, an internal Spark class is run.</li>
+   * </ul>
+   *
+   * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and
+   * "bin/spark-class2.cmd" batch script on Windows to execute the final command.
+   * <p/>
+   * On Unix-like systems, the output is a list of command arguments, separated by the NULL
+   * character. On Windows, the output is a command line suitable for direct execution from the
+   * script.
+   */
+  public static void main(String[] argsArray) throws Exception {
+    checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
+
+    List<String> args = new ArrayList<String>(Arrays.asList(argsArray));
+    String className = args.remove(0);
+
+    boolean printLaunchCommand;
+    boolean printUsage;
+    AbstractCommandBuilder builder;
+    try {
+      if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
+        builder = new SparkSubmitCommandBuilder(args);
+      } else {
+        builder = new SparkClassCommandBuilder(className, args);
+      }
+      printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
+      printUsage = false;
+    } catch (IllegalArgumentException e) {
+      builder = new UsageCommandBuilder(e.getMessage());
+      printLaunchCommand = false;
+      printUsage = true;
+    }
+
+    Map<String, String> env = new HashMap<String, String>();
+    List<String> cmd = builder.buildCommand(env);
+    if (printLaunchCommand) {
+      System.err.println("Spark Command: " + join(" ", cmd));
+      System.err.println("========================================");
+    }
+
+    if (isWindows()) {
+      // When printing the usage message, we can't use "cmd /v" since that prevents the env
+      // variable from being seen in the caller script. So do not call prepareWindowsCommand().
+      if (printUsage) {
+        System.out.println(join(" ", cmd));
+      } else {
+        System.out.println(prepareWindowsCommand(cmd, env));
+      }
+    } else {
+      // In bash, use NULL as the arg separator since it cannot be used in an argument.
+      List<String> bashCmd = prepareBashCommand(cmd, env);
+      for (String c : bashCmd) {
+        System.out.print(c);
+        System.out.print('\0');
+      }
+    }
+  }
+
+  /**
+   * Prepare a command line for execution from a Windows batch script.
+   *
+   * The method quotes all arguments so that spaces are handled as expected. Quotes within arguments
+   * are "double quoted" (which is batch for escaping a quote). This page has more details about
+   * quoting and other batch script fun stuff: http://ss64.com/nt/syntax-esc.html
+   *
+   * The command is executed using "cmd /c" and formatted in single line, since that's the
+   * easiest way to consume this from a batch script (see spark-class2.cmd).
+   */
+  private static String prepareWindowsCommand(List<String> cmd, Map<String, String> childEnv) {
+    StringBuilder cmdline = new StringBuilder("cmd /c \"");
+    for (Map.Entry<String, String> e : childEnv.entrySet()) {
+      cmdline.append(String.format("set %s=%s", e.getKey(), e.getValue()));
+      cmdline.append(" && ");
+    }
+    for (String arg : cmd) {
+      cmdline.append(quoteForBatchScript(arg));
+      cmdline.append(" ");
+    }
+    cmdline.append("\"");
+    return cmdline.toString();
+  }
+
+  /**
+   * Prepare the command for execution from a bash script. The final command will have commands to
+   * set up any needed environment variables needed by the child process.
+   */
+  private static List<String> prepareBashCommand(List<String> cmd, Map<String, String> childEnv) {
+    if (childEnv.isEmpty()) {
+      return cmd;
+    }
+
+    List<String> newCmd = new ArrayList<String>();
+    newCmd.add("env");
+
+    for (Map.Entry<String, String> e : childEnv.entrySet()) {
+      newCmd.add(String.format("%s=%s", e.getKey(), e.getValue()));
+    }
+    newCmd.addAll(cmd);
+    return newCmd;
+  }
+
+  /**
+   * Internal builder used when command line parsing fails. This will behave differently depending
+   * on the platform:
+   *
+   * - On Unix-like systems, it will print a call to the "usage" function with two arguments: the
+   *   the error string, and the exit code to use. The function is expected to print the command's
+   *   usage and exit with the provided exit code. The script should use "export -f usage" after
+   *   declaring a function called "usage", so that the function is available to downstream scripts.
+   *
+   * - On Windows it will set the variable "SPARK_LAUNCHER_USAGE_ERROR" to the usage error message.
+   *   The batch script should check for this variable and print its usage, since batch scripts
+   *   don't really support the "export -f" functionality used in bash.
+   */
+  private static class UsageCommandBuilder extends AbstractCommandBuilder {
+
+    private final String message;
+
+    UsageCommandBuilder(String message) {
+      this.message = message;
+    }
+
+    @Override
+    public List<String> buildCommand(Map<String, String> env) {
+      if (isWindows()) {
+        return Arrays.asList("set", "SPARK_LAUNCHER_USAGE_ERROR=" + message);
+      } else {
+        return Arrays.asList("usage", message, "1");
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
new file mode 100644
index 0000000..e601a0a
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Command builder for internal Spark classes.
+ * <p/>
+ * This class handles building the command to launch all internal Spark classes except for
+ * SparkSubmit (which is handled by {@link SparkSubmitCommandBuilder} class.
+ */
+class SparkClassCommandBuilder extends AbstractCommandBuilder {
+
+  private final String className;
+  private final List<String> classArgs;
+
+  SparkClassCommandBuilder(String className, List<String> classArgs) {
+    this.className = className;
+    this.classArgs = classArgs;
+  }
+
+  @Override
+  public List<String> buildCommand(Map<String, String> env) throws IOException {
+    List<String> javaOptsKeys = new ArrayList<String>();
+    String memKey = null;
+    String extraClassPath = null;
+
+    // Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) +
+    // SPARK_DAEMON_MEMORY.
+    if (className.equals("org.apache.spark.deploy.master.Master")) {
+      javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+      javaOptsKeys.add("SPARK_MASTER_OPTS");
+      memKey = "SPARK_DAEMON_MEMORY";
+    } else if (className.equals("org.apache.spark.deploy.worker.Worker")) {
+      javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+      javaOptsKeys.add("SPARK_WORKER_OPTS");
+      memKey = "SPARK_DAEMON_MEMORY";
+    } else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) {
+      javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
+      javaOptsKeys.add("SPARK_HISTORY_OPTS");
+      memKey = "SPARK_DAEMON_MEMORY";
+    } else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) {
+      javaOptsKeys.add("SPARK_JAVA_OPTS");
+      javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+      memKey = "SPARK_EXECUTOR_MEMORY";
+    } else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) {
+      javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
+      memKey = "SPARK_EXECUTOR_MEMORY";
+    } else if (className.startsWith("org.apache.spark.tools.")) {
+      String sparkHome = getSparkHome();
+      File toolsDir = new File(join(File.separator, sparkHome, "tools", "target",
+        "scala-" + getScalaVersion()));
+      checkState(toolsDir.isDirectory(), "Cannot find tools build directory.");
+
+      Pattern re = Pattern.compile("spark-tools_.*\\.jar");
+      for (File f : toolsDir.listFiles()) {
+        if (re.matcher(f.getName()).matches()) {
+          extraClassPath = f.getAbsolutePath();
+          break;
+        }
+      }
+
+      checkState(extraClassPath != null,
+        "Failed to find Spark Tools Jar in %s.\n" +
+        "You need to run \"build/sbt tools/package\" before running %s.",
+        toolsDir.getAbsolutePath(), className);
+
+      javaOptsKeys.add("SPARK_JAVA_OPTS");
+    }
+
+    List<String> cmd = buildJavaCommand(extraClassPath);
+    for (String key : javaOptsKeys) {
+      addOptionString(cmd, System.getenv(key));
+    }
+
+    String mem = firstNonEmpty(memKey != null ? System.getenv(memKey) : null, DEFAULT_MEM);
+    cmd.add("-Xms" + mem);
+    cmd.add("-Xmx" + mem);
+    addPermGenSizeOpt(cmd);
+    cmd.add(className);
+    cmd.addAll(classArgs);
+    return cmd;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
new file mode 100644
index 0000000..b566507
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Launcher for Spark applications.
+ * <p/>
+ * Use this class to start Spark applications programmatically. The class uses a builder pattern
+ * to allow clients to configure the Spark application and launch it as a child process.
+ */
+public class SparkLauncher {
+
+  /** The Spark master. */
+  public static final String SPARK_MASTER = "spark.master";
+
+  /** Configuration key for the driver memory. */
+  public static final String DRIVER_MEMORY = "spark.driver.memory";
+  /** Configuration key for the driver class path. */
+  public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
+  /** Configuration key for the driver VM options. */
+  public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
+  /** Configuration key for the driver native library path. */
+  public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
+
+  /** Configuration key for the executor memory. */
+  public static final String EXECUTOR_MEMORY = "spark.executor.memory";
+  /** Configuration key for the executor class path. */
+  public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
+  /** Configuration key for the executor VM options. */
+  public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
+  /** Configuration key for the executor native library path. */
+  public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryOptions";
+  /** Configuration key for the number of executor CPU cores. */
+  public static final String EXECUTOR_CORES = "spark.executor.cores";
+
+  private final SparkSubmitCommandBuilder builder;
+
+  public SparkLauncher() {
+    this(null);
+  }
+
+  /**
+   * Creates a launcher that will set the given environment variables in the child.
+   *
+   * @param env Environment variables to set.
+   */
+  public SparkLauncher(Map<String, String> env) {
+    this.builder = new SparkSubmitCommandBuilder();
+    if (env != null) {
+      this.builder.childEnv.putAll(env);
+    }
+  }
+
+  /**
+   * Set a custom JAVA_HOME for launching the Spark application.
+   *
+   * @param javaHome Path to the JAVA_HOME to use.
+   * @return This launcher.
+   */
+  public SparkLauncher setJavaHome(String javaHome) {
+    checkNotNull(javaHome, "javaHome");
+    builder.javaHome = javaHome;
+    return this;
+  }
+
+  /**
+   * Set a custom Spark installation location for the application.
+   *
+   * @param sparkHome Path to the Spark installation to use.
+   * @return This launcher.
+   */
+  public SparkLauncher setSparkHome(String sparkHome) {
+    checkNotNull(sparkHome, "sparkHome");
+    builder.childEnv.put(ENV_SPARK_HOME, sparkHome);
+    return this;
+  }
+
+  /**
+   * Set a custom properties file with Spark configuration for the application.
+   *
+   * @param path Path to custom properties file to use.
+   * @return This launcher.
+   */
+  public SparkLauncher setPropertiesFile(String path) {
+    checkNotNull(path, "path");
+    builder.propertiesFile = path;
+    return this;
+  }
+
+  /**
+   * Set a single configuration value for the application.
+   *
+   * @param key Configuration key.
+   * @param value The value to use.
+   * @return This launcher.
+   */
+  public SparkLauncher setConf(String key, String value) {
+    checkNotNull(key, "key");
+    checkNotNull(value, "value");
+    checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
+    builder.conf.put(key, value);
+    return this;
+  }
+
+  /**
+   * Set the application name.
+   *
+   * @param appName Application name.
+   * @return This launcher.
+   */
+  public SparkLauncher setAppName(String appName) {
+    checkNotNull(appName, "appName");
+    builder.appName = appName;
+    return this;
+  }
+
+  /**
+   * Set the Spark master for the application.
+   *
+   * @param master Spark master.
+   * @return This launcher.
+   */
+  public SparkLauncher setMaster(String master) {
+    checkNotNull(master, "master");
+    builder.master = master;
+    return this;
+  }
+
+  /**
+   * Set the deploy mode for the application.
+   *
+   * @param mode Deploy mode.
+   * @return This launcher.
+   */
+  public SparkLauncher setDeployMode(String mode) {
+    checkNotNull(mode, "mode");
+    builder.deployMode = mode;
+    return this;
+  }
+
+  /**
+   * Set the main application resource. This should be the location of a jar file for Scala/Java
+   * applications, or a python script for PySpark applications.
+   *
+   * @param resource Path to the main application resource.
+   * @return This launcher.
+   */
+  public SparkLauncher setAppResource(String resource) {
+    checkNotNull(resource, "resource");
+    builder.appResource = resource;
+    return this;
+  }
+
+  /**
+   * Sets the application class name for Java/Scala applications.
+   *
+   * @param mainClass Application's main class.
+   * @return This launcher.
+   */
+  public SparkLauncher setMainClass(String mainClass) {
+    checkNotNull(mainClass, "mainClass");
+    builder.mainClass = mainClass;
+    return this;
+  }
+
+  /**
+   * Adds command line arguments for the application.
+   *
+   * @param args Arguments to pass to the application's main class.
+   * @return This launcher.
+   */
+  public SparkLauncher addAppArgs(String... args) {
+    for (String arg : args) {
+      checkNotNull(arg, "arg");
+      builder.appArgs.add(arg);
+    }
+    return this;
+  }
+
+  /**
+   * Adds a jar file to be submitted with the application.
+   *
+   * @param jar Path to the jar file.
+   * @return This launcher.
+   */
+  public SparkLauncher addJar(String jar) {
+    checkNotNull(jar, "jar");
+    builder.jars.add(jar);
+    return this;
+  }
+
+  /**
+   * Adds a file to be submitted with the application.
+   *
+   * @param file Path to the file.
+   * @return This launcher.
+   */
+  public SparkLauncher addFile(String file) {
+    checkNotNull(file, "file");
+    builder.files.add(file);
+    return this;
+  }
+
+  /**
+   * Adds a python file / zip / egg to be submitted with the application.
+   *
+   * @param file Path to the file.
+   * @return This launcher.
+   */
+  public SparkLauncher addPyFile(String file) {
+    checkNotNull(file, "file");
+    builder.pyFiles.add(file);
+    return this;
+  }
+
+  /**
+   * Enables verbose reporting for SparkSubmit.
+   *
+   * @param verbose Whether to enable verbose output.
+   * @return This launcher.
+   */
+  public SparkLauncher setVerbose(boolean verbose) {
+    builder.verbose = verbose;
+    return this;
+  }
+
+  /**
+   * Launches a sub-process that will start the configured Spark application.
+   *
+   * @return A process handle for the Spark app.
+   */
+  public Process launch() throws IOException {
+    List<String> cmd = new ArrayList<String>();
+    String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
+    cmd.add(join(File.separator, builder.getSparkHome(), "bin", script));
+    cmd.addAll(builder.buildSparkSubmitArgs());
+
+    // Since the child process is a batch script, let's quote things so that special characters are
+    // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are
+    // weird.
+    if (isWindows()) {
+      List<String> winCmd = new ArrayList<String>();
+      for (String arg : cmd) {
+        winCmd.add(quoteForBatchScript(arg));
+      }
+      cmd = winCmd;
+    }
+
+    ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
+    for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
+      pb.environment().put(e.getKey(), e.getValue());
+    }
+    return pb.start();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
new file mode 100644
index 0000000..6ffdff6
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+/**
+ * Special command builder for handling a CLI invocation of SparkSubmit.
+ * <p/>
+ * This builder adds command line parsing compatible with SparkSubmit. It handles setting
+ * driver-side options and special parsing behavior needed for the special-casing certain internal
+ * Spark applications.
+ * <p/>
+ * This class has also some special features to aid launching pyspark.
+ */
+class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
+
+  /**
+   * Name of the app resource used to identify the PySpark shell. The command line parser expects
+   * the resource name to be the very first argument to spark-submit in this case.
+   *
+   * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit
+   * (see java_gateway.py), and can cause this code to enter into an infinite loop.
+   */
+  static final String PYSPARK_SHELL = "pyspark-shell-main";
+
+  /**
+   * This is the actual resource name that identifies the PySpark shell to SparkSubmit.
+   */
+  static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";
+
+  /**
+   * This map must match the class names for available special classes, since this modifies the way
+   * command line parsing works. This maps the class name to the resource to use when calling
+   * spark-submit.
+   */
+  private static final Map<String, String> specialClasses = new HashMap<String, String>();
+  static {
+    specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
+    specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
+      "spark-internal");
+    specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
+      "spark-internal");
+  }
+
+  private final List<String> sparkArgs;
+
+  /**
+   * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed
+   * to parse the command lines for things like bin/spark-shell, which allows users to mix and
+   * match arguments (e.g. "bin/spark-shell SparkShellArg --master foo").
+   */
+  private boolean allowsMixedArguments;
+
+  SparkSubmitCommandBuilder() {
+    this.sparkArgs = new ArrayList<String>();
+  }
+
+  SparkSubmitCommandBuilder(List<String> args) {
+    this();
+    List<String> submitArgs = args;
+    if (args.size() > 0 && args.get(0).equals(PYSPARK_SHELL)) {
+      this.allowsMixedArguments = true;
+      appResource = PYSPARK_SHELL_RESOURCE;
+      submitArgs = args.subList(1, args.size());
+    } else {
+      this.allowsMixedArguments = false;
+    }
+
+    new OptionParser().parse(submitArgs);
+  }
+
+  @Override
+  public List<String> buildCommand(Map<String, String> env) throws IOException {
+    if (PYSPARK_SHELL_RESOURCE.equals(appResource)) {
+      return buildPySparkShellCommand(env);
+    } else {
+      return buildSparkSubmitCommand(env);
+    }
+  }
+
+  List<String> buildSparkSubmitArgs() {
+    List<String> args = new ArrayList<String>();
+    SparkSubmitOptionParser parser = new SparkSubmitOptionParser();
+
+    if (verbose) {
+      args.add(parser.VERBOSE);
+    }
+
+    if (master != null) {
+      args.add(parser.MASTER);
+      args.add(master);
+    }
+
+    if (deployMode != null) {
+      args.add(parser.DEPLOY_MODE);
+      args.add(deployMode);
+    }
+
+    if (appName != null) {
+      args.add(parser.NAME);
+      args.add(appName);
+    }
+
+    for (Map.Entry<String, String> e : conf.entrySet()) {
+      args.add(parser.CONF);
+      args.add(String.format("%s=%s", e.getKey(), e.getValue()));
+    }
+
+    if (propertiesFile != null) {
+      args.add(parser.PROPERTIES_FILE);
+      args.add(propertiesFile);
+    }
+
+    if (!jars.isEmpty()) {
+      args.add(parser.JARS);
+      args.add(join(",", jars));
+    }
+
+    if (!files.isEmpty()) {
+      args.add(parser.FILES);
+      args.add(join(",", files));
+    }
+
+    if (!pyFiles.isEmpty()) {
+      args.add(parser.PY_FILES);
+      args.add(join(",", pyFiles));
+    }
+
+    if (mainClass != null) {
+      args.add(parser.CLASS);
+      args.add(mainClass);
+    }
+
+    args.addAll(sparkArgs);
+    if (appResource != null) {
+      args.add(appResource);
+    }
+    args.addAll(appArgs);
+
+    return args;
+  }
+
+  private List<String> buildSparkSubmitCommand(Map<String, String> env) throws IOException {
+    // Load the properties file and check whether spark-submit will be running the app's driver
+    // or just launching a cluster app. When running the driver, the JVM's argument will be
+    // modified to cover the driver's configuration.
+    Properties props = loadPropertiesFile();
+    boolean isClientMode = isClientMode(props);
+    String extraClassPath = isClientMode ?
+      firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_CLASSPATH, conf, props) : null;
+
+    List<String> cmd = buildJavaCommand(extraClassPath);
+    addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
+    addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));
+
+    if (isClientMode) {
+      // Figuring out where the memory value come from is a little tricky due to precedence.
+      // Precedence is observed in the following order:
+      // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
+      // - properties file.
+      // - SPARK_DRIVER_MEMORY env variable
+      // - SPARK_MEM env variable
+      // - default value (512m)
+      String memory = firstNonEmpty(firstNonEmptyValue(SparkLauncher.DRIVER_MEMORY, conf, props),
+        System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
+      cmd.add("-Xms" + memory);
+      cmd.add("-Xmx" + memory);
+      addOptionString(cmd, firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, conf, props));
+      mergeEnvPathList(env, getLibPathEnvName(),
+        firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+    }
+
+    addPermGenSizeOpt(cmd);
+    cmd.add("org.apache.spark.deploy.SparkSubmit");
+    cmd.addAll(buildSparkSubmitArgs());
+    return cmd;
+  }
+
+  private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
+    // For backwards compatibility, if a script is specified in
+    // the pyspark command line, then run it using spark-submit.
+    if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) {
+      System.err.println(
+        "WARNING: Running python applications through 'pyspark' is deprecated as of Spark 1.0.\n" +
+        "Use ./bin/spark-submit <python file>");
+      appResource = appArgs.get(0);
+      appArgs.remove(0);
+      return buildCommand(env);
+    }
+
+    // When launching the pyspark shell, the spark-submit arguments should be stored in the
+    // PYSPARK_SUBMIT_ARGS env variable. The executable is the PYSPARK_DRIVER_PYTHON env variable
+    // set by the pyspark script, followed by PYSPARK_DRIVER_PYTHON_OPTS.
+    checkArgument(appArgs.isEmpty(), "pyspark does not support any application options.");
+
+    Properties props = loadPropertiesFile();
+    mergeEnvPathList(env, getLibPathEnvName(),
+      firstNonEmptyValue(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, conf, props));
+
+    // Store spark-submit arguments in an environment variable, since there's no way to pass
+    // them to shell.py on the comand line.
+    StringBuilder submitArgs = new StringBuilder();
+    for (String arg : buildSparkSubmitArgs()) {
+      if (submitArgs.length() > 0) {
+        submitArgs.append(" ");
+      }
+      submitArgs.append(quoteForPython(arg));
+    }
+    env.put("PYSPARK_SUBMIT_ARGS", submitArgs.toString());
+
+    List<String> pyargs = new ArrayList<String>();
+    pyargs.add(firstNonEmpty(System.getenv("PYSPARK_DRIVER_PYTHON"), "python"));
+    String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
+    if (!isEmpty(pyOpts)) {
+      pyargs.addAll(parseOptionString(pyOpts));
+    }
+
+    return pyargs;
+  }
+
+  private boolean isClientMode(Properties userProps) {
+    String userMaster = firstNonEmpty(master, (String) userProps.get(SparkLauncher.SPARK_MASTER));
+    // Default master is "local[*]", so assume client mode in that case.
+    return userMaster == null ||
+      "client".equals(deployMode) ||
+      (!userMaster.equals("yarn-cluster") && deployMode == null);
+  }
+
+  private class OptionParser extends SparkSubmitOptionParser {
+
+    private final List<String> driverJvmKeys = Arrays.asList(
+      SparkLauncher.DRIVER_EXTRA_CLASSPATH,
+      SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+      SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH,
+      SparkLauncher.DRIVER_MEMORY);
+
+    @Override
+    protected boolean handle(String opt, String value) {
+      if (opt.equals(MASTER)) {
+        master = value;
+      } else if (opt.equals(DEPLOY_MODE)) {
+        deployMode = value;
+      } else if (opt.equals(PROPERTIES_FILE)) {
+        propertiesFile = value;
+      } else if (opt.equals(DRIVER_MEMORY)) {
+        conf.put(SparkLauncher.DRIVER_MEMORY, value);
+      } else if (opt.equals(DRIVER_JAVA_OPTIONS)) {
+        conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
+      } else if (opt.equals(DRIVER_LIBRARY_PATH)) {
+        conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
+      } else if (opt.equals(DRIVER_CLASS_PATH)) {
+        conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
+      } else if (opt.equals(CONF)) {
+        String[] setConf = value.split("=", 2);
+        checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
+        if (driverJvmKeys.contains(setConf[0])) {
+          conf.put(setConf[0], setConf[1]);
+        }
+      } else if (opt.equals(CLASS)) {
+        // The special classes require some special command line handling, since they allow
+        // mixing spark-submit arguments with arguments that should be propagated to the shell
+        // itself. Note that for this to work, the "--class" argument must come before any
+        // non-spark-submit arguments.
+        mainClass = value;
+        if (specialClasses.containsKey(value)) {
+          allowsMixedArguments = true;
+          appResource = specialClasses.get(value);
+        }
+      } else {
+        sparkArgs.add(opt);
+        if (value != null) {
+          sparkArgs.add(value);
+        }
+      }
+      return true;
+    }
+
+    @Override
+    protected boolean handleUnknown(String opt) {
+      // When mixing arguments, add unrecognized parameters directly to the user arguments list. In
+      // normal mode, any unrecognized parameter triggers the end of command line parsing, and the
+      // parameter itself will be interpreted by SparkSubmit as the application resource. The
+      // remaining params will be appended to the list of SparkSubmit arguments.
+      if (allowsMixedArguments) {
+        appArgs.add(opt);
+        return true;
+      } else {
+        sparkArgs.add(opt);
+        return false;
+      }
+    }
+
+    @Override
+    protected void handleExtraArgs(List<String> extra) {
+      for (String arg : extra) {
+        sparkArgs.add(arg);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
new file mode 100644
index 0000000..8526d2e
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Parser for spark-submit command line options.
+ * <p/>
+ * This class encapsulates the parsing code for spark-submit command line options, so that there
+ * is a single list of options that needs to be maintained (well, sort of, but it makes it harder
+ * to break things).
+ */
+class SparkSubmitOptionParser {
+
+  // The following constants define the "main" name for the available options. They're defined
+  // to avoid copy & paste of the raw strings where they're needed.
+  //
+  // The fields are not static so that they're exposed to Scala code that uses this class. See
+  // SparkSubmitArguments.scala. That is also why this class is not abstract - to allow code to
+  // easily use these constants without having to create dummy implementations of this class.
+  protected final String CLASS = "--class";
+  protected final String CONF = "--conf";
+  protected final String DEPLOY_MODE = "--deploy-mode";
+  protected final String DRIVER_CLASS_PATH = "--driver-class-path";
+  protected final String DRIVER_CORES = "--driver-cores";
+  protected final String DRIVER_JAVA_OPTIONS =  "--driver-java-options";
+  protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
+  protected final String DRIVER_MEMORY = "--driver-memory";
+  protected final String EXECUTOR_MEMORY = "--executor-memory";
+  protected final String FILES = "--files";
+  protected final String JARS = "--jars";
+  protected final String KILL_SUBMISSION = "--kill";
+  protected final String MASTER = "--master";
+  protected final String NAME = "--name";
+  protected final String PACKAGES = "--packages";
+  protected final String PROPERTIES_FILE = "--properties-file";
+  protected final String PROXY_USER = "--proxy-user";
+  protected final String PY_FILES = "--py-files";
+  protected final String REPOSITORIES = "--repositories";
+  protected final String STATUS = "--status";
+  protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";
+
+  // Options that do not take arguments.
+  protected final String HELP = "--help";
+  protected final String SUPERVISE = "--supervise";
+  protected final String VERBOSE = "--verbose";
+  protected final String VERSION = "--version";
+
+  // Standalone-only options.
+
+  // YARN-only options.
+  protected final String ARCHIVES = "--archives";
+  protected final String EXECUTOR_CORES = "--executor-cores";
+  protected final String QUEUE = "--queue";
+  protected final String NUM_EXECUTORS = "--num-executors";
+
+  /**
+   * This is the canonical list of spark-submit options. Each entry in the array contains the
+   * different aliases for the same option; the first element of each entry is the "official"
+   * name of the option, passed to {@link #handle(String, String)}.
+   * <p/>
+   * Options not listed here nor in the "switch" list below will result in a call to
+   * {@link $#handleUnknown(String)}.
+   * <p/>
+   * These two arrays are visible for tests.
+   */
+  final String[][] opts = {
+    { ARCHIVES },
+    { CLASS },
+    { CONF, "-c" },
+    { DEPLOY_MODE },
+    { DRIVER_CLASS_PATH },
+    { DRIVER_CORES },
+    { DRIVER_JAVA_OPTIONS },
+    { DRIVER_LIBRARY_PATH },
+    { DRIVER_MEMORY },
+    { EXECUTOR_CORES },
+    { EXECUTOR_MEMORY },
+    { FILES },
+    { JARS },
+    { KILL_SUBMISSION },
+    { MASTER },
+    { NAME },
+    { NUM_EXECUTORS },
+    { PACKAGES },
+    { PROPERTIES_FILE },
+    { PROXY_USER },
+    { PY_FILES },
+    { QUEUE },
+    { REPOSITORIES },
+    { STATUS },
+    { TOTAL_EXECUTOR_CORES },
+  };
+
+  /**
+   * List of switches (command line options that do not take parameters) recognized by spark-submit.
+   */
+  final String[][] switches = {
+    { HELP, "-h" },
+    { SUPERVISE },
+    { VERBOSE, "-v" },
+    { VERSION },
+  };
+
+  /**
+   * Parse a list of spark-submit command line options.
+   * <p/>
+   * See SparkSubmitArguments.scala for a more formal description of available options.
+   *
+   * @throws IllegalArgumentException If an error is found during parsing.
+   */
+  protected final void parse(List<String> args) {
+    Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
+
+    int idx = 0;
+    for (idx = 0; idx < args.size(); idx++) {
+      String arg = args.get(idx);
+      String value = null;
+
+      Matcher m = eqSeparatedOpt.matcher(arg);
+      if (m.matches()) {
+        arg = m.group(1);
+        value = m.group(2);
+      }
+
+      // Look for options with a value.
+      String name = findCliOption(arg, opts);
+      if (name != null) {
+        if (value == null) {
+          if (idx == args.size() - 1) {
+            throw new IllegalArgumentException(
+                String.format("Missing argument for option '%s'.", arg));
+          }
+          idx++;
+          value = args.get(idx);
+        }
+        if (!handle(name, value)) {
+          break;
+        }
+        continue;
+      }
+
+      // Look for a switch.
+      name = findCliOption(arg, switches);
+      if (name != null) {
+        if (!handle(name, null)) {
+          break;
+        }
+        continue;
+      }
+
+      if (!handleUnknown(arg)) {
+        break;
+      }
+    }
+
+    if (idx < args.size()) {
+      idx++;
+    }
+    handleExtraArgs(args.subList(idx, args.size()));
+  }
+
+  /**
+   * Callback for when an option with an argument is parsed.
+   *
+   * @param opt The long name of the cli option (might differ from actual command line).
+   * @param value The value. This will be <i>null</i> if the option does not take a value.
+   * @return Whether to continue parsing the argument list.
+   */
+  protected boolean handle(String opt, String value) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Callback for when an unrecognized option is parsed.
+   *
+   * @param opt Unrecognized option from the command line.
+   * @return Whether to continue parsing the argument list.
+   */
+  protected boolean handleUnknown(String opt) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Callback for remaining command line arguments after either {@link #handle(String, String)} or
+   * {@link #handleUnknown(String)} return "false". This will be called at the end of parsing even
+   * when there are no remaining arguments.
+   *
+   * @param extra List of remaining arguments.
+   */
+  protected void handleExtraArgs(List<String> extra) {
+    throw new UnsupportedOperationException();
+  }
+
+  private String findCliOption(String name, String[][] available) {
+    for (String[] candidates : available) {
+      for (String candidate : candidates) {
+        if (candidate.equals(name)) {
+          return candidates[0];
+        }
+      }
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/main/java/org/apache/spark/launcher/package-info.java
----------------------------------------------------------------------
diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
new file mode 100644
index 0000000..7ed756f
--- /dev/null
+++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Library for launching Spark applications.
+ * <p/>
+ * This library allows applications to launch Spark programmatically. There's only one entry
+ * point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class.
+ * <p/>
+ * To launch a Spark application, just instantiate a {@link org.apache.spark.launcher.SparkLauncher}
+ * and configure the application to run. For example:
+ *
+ * <pre>
+ * {@code
+ *   import org.apache.spark.launcher.SparkLauncher;
+ *
+ *   public class MyLauncher {
+ *     public static void main(String[] args) throws Exception {
+ *       Process spark = new SparkLauncher()
+ *         .setAppResource("/my/app.jar")
+ *         .setMainClass("my.spark.app.Main")
+ *         .setMaster("local")
+ *         .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
+ *         .launch();
+ *       spark.waitFor();
+ *     }
+ *   }
+ * }
+ * </pre>
+ */
+package org.apache.spark.launcher;

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
new file mode 100644
index 0000000..dba0203
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/CommandBuilderUtilsSuite.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import static org.apache.spark.launcher.CommandBuilderUtils.*;
+
+public class CommandBuilderUtilsSuite {
+
+  @Test
+  public void testValidOptionStrings() {
+    testOpt("a b c d e", Arrays.asList("a", "b", "c", "d", "e"));
+    testOpt("a 'b c' \"d\" e", Arrays.asList("a", "b c", "d", "e"));
+    testOpt("a 'b\\\"c' \"'d'\" e", Arrays.asList("a", "b\\\"c", "'d'", "e"));
+    testOpt("a 'b\"c' \"\\\"d\\\"\" e", Arrays.asList("a", "b\"c", "\"d\"", "e"));
+    testOpt(" a b c \\\\ ", Arrays.asList("a", "b", "c", "\\"));
+
+    // Following tests ported from UtilsSuite.scala.
+    testOpt("", new ArrayList<String>());
+    testOpt("a", Arrays.asList("a"));
+    testOpt("aaa", Arrays.asList("aaa"));
+    testOpt("a b c", Arrays.asList("a", "b", "c"));
+    testOpt("  a   b\t c ", Arrays.asList("a", "b", "c"));
+    testOpt("a 'b c'", Arrays.asList("a", "b c"));
+    testOpt("a 'b c' d", Arrays.asList("a", "b c", "d"));
+    testOpt("'b c'", Arrays.asList("b c"));
+    testOpt("a \"b c\"", Arrays.asList("a", "b c"));
+    testOpt("a \"b c\" d", Arrays.asList("a", "b c", "d"));
+    testOpt("\"b c\"", Arrays.asList("b c"));
+    testOpt("a 'b\" c' \"d' e\"", Arrays.asList("a", "b\" c", "d' e"));
+    testOpt("a\t'b\nc'\nd", Arrays.asList("a", "b\nc", "d"));
+    testOpt("a \"b\\\\c\"", Arrays.asList("a", "b\\c"));
+    testOpt("a \"b\\\"c\"", Arrays.asList("a", "b\"c"));
+    testOpt("a 'b\\\"c'", Arrays.asList("a", "b\\\"c"));
+    testOpt("'a'b", Arrays.asList("ab"));
+    testOpt("'a''b'", Arrays.asList("ab"));
+    testOpt("\"a\"b", Arrays.asList("ab"));
+    testOpt("\"a\"\"b\"", Arrays.asList("ab"));
+    testOpt("''", Arrays.asList(""));
+    testOpt("\"\"", Arrays.asList(""));
+  }
+
+  @Test
+  public void testInvalidOptionStrings() {
+    testInvalidOpt("\\");
+    testInvalidOpt("\"abcde");
+    testInvalidOpt("'abcde");
+  }
+
+  @Test
+  public void testWindowsBatchQuoting() {
+    assertEquals("abc", quoteForBatchScript("abc"));
+    assertEquals("\"a b c\"", quoteForBatchScript("a b c"));
+    assertEquals("\"a \"\"b\"\" c\"", quoteForBatchScript("a \"b\" c"));
+    assertEquals("\"a\"\"b\"\"c\"", quoteForBatchScript("a\"b\"c"));
+    assertEquals("\"ab^=\"\"cd\"\"\"", quoteForBatchScript("ab=\"cd\""));
+  }
+
+  @Test
+  public void testPythonArgQuoting() {
+    assertEquals("\"abc\"", quoteForPython("abc"));
+    assertEquals("\"a b c\"", quoteForPython("a b c"));
+    assertEquals("\"a \\\"b\\\" c\"", quoteForPython("a \"b\" c"));
+  }
+
+  private void testOpt(String opts, List<String> expected) {
+    assertEquals(String.format("test string failed to parse: [[ %s ]]", opts),
+        expected, parseOptionString(opts));
+  }
+
+  private void testInvalidOpt(String opts) {
+    try {
+      parseOptionString(opts);
+      fail("Expected exception for invalid option string.");
+    } catch (IllegalArgumentException e) {
+      // pass.
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
new file mode 100644
index 0000000..252d5ab
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ * These tests require the Spark assembly to be built before they can be run.
+ */
+public class SparkLauncherSuite {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
+
+  @Test
+  public void testChildProcLauncher() throws Exception {
+    Map<String, String> env = new HashMap<String, String>();
+    env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
+
+    SparkLauncher launcher = new SparkLauncher(env)
+      .setSparkHome(System.getProperty("spark.test.home"))
+      .setMaster("local")
+      .setAppResource("spark-internal")
+      .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
+        "-Dfoo=bar -Dtest.name=-testChildProcLauncher")
+      .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
+      .setMainClass(SparkLauncherTestApp.class.getName())
+      .addAppArgs("proc");
+    final Process app = launcher.launch();
+    new Redirector("stdout", app.getInputStream()).start();
+    new Redirector("stderr", app.getErrorStream()).start();
+    assertEquals(0, app.waitFor());
+  }
+
+  public static class SparkLauncherTestApp {
+
+    public static void main(String[] args) throws Exception {
+      assertEquals(1, args.length);
+      assertEquals("proc", args[0]);
+      assertEquals("bar", System.getProperty("foo"));
+      assertEquals("local", System.getProperty(SparkLauncher.SPARK_MASTER));
+    }
+
+  }
+
+  private static class Redirector extends Thread {
+
+    private final InputStream in;
+
+    Redirector(String name, InputStream in) {
+      this.in = in;
+      setName(name);
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      try {
+        BufferedReader reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+        String line;
+        while ((line = reader.readLine()) != null) {
+          LOG.warn(line);
+        }
+      } catch (Exception e) {
+        LOG.error("Error reading process output.", e);
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517975d8/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
----------------------------------------------------------------------
diff --git a/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
new file mode 100644
index 0000000..815edc4
--- /dev/null
+++ b/launcher/src/test/java/org/apache/spark/launcher/SparkSubmitCommandBuilderSuite.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.launcher;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class SparkSubmitCommandBuilderSuite {
+
+  private static File dummyPropsFile;
+  private static SparkSubmitOptionParser parser;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    dummyPropsFile = File.createTempFile("spark", "properties");
+    parser = new SparkSubmitOptionParser();
+  }
+
+  @AfterClass
+  public static void cleanUp() throws Exception {
+    dummyPropsFile.delete();
+  }
+
+  @Test
+  public void testDriverCmdBuilder() throws Exception {
+    testCmdBuilder(true);
+  }
+
+  @Test
+  public void testClusterCmdBuilder() throws Exception {
+    testCmdBuilder(false);
+  }
+
+  @Test
+  public void testCliParser() throws Exception {
+    List<String> sparkSubmitArgs = Arrays.asList(
+      parser.MASTER,
+      "local",
+      parser.DRIVER_MEMORY,
+      "42g",
+      parser.DRIVER_CLASS_PATH,
+      "/driverCp",
+      parser.DRIVER_JAVA_OPTIONS,
+      "extraJavaOpt",
+      parser.CONF,
+      SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath");
+    Map<String, String> env = new HashMap<String, String>();
+    List<String> cmd = buildCommand(sparkSubmitArgs, env);
+
+    assertTrue(findInStringList(env.get(CommandBuilderUtils.getLibPathEnvName()),
+        File.pathSeparator, "/driverLibPath"));
+    assertTrue(findInStringList(findArgValue(cmd, "-cp"), File.pathSeparator, "/driverCp"));
+    assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms42g"));
+    assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx42g"));
+  }
+
+  @Test
+  public void testShellCliParser() throws Exception {
+    List<String> sparkSubmitArgs = Arrays.asList(
+      parser.CLASS,
+      "org.apache.spark.repl.Main",
+      parser.MASTER,
+      "foo",
+      "--app-arg",
+      "bar",
+      "--app-switch",
+      parser.FILES,
+      "baz",
+      parser.NAME,
+      "appName");
+
+    List<String> args = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+    List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", "--app-switch");
+    assertEquals(expected, args.subList(args.size() - expected.size(), args.size()));
+  }
+
+  @Test
+  public void testAlternateSyntaxParsing() throws Exception {
+    List<String> sparkSubmitArgs = Arrays.asList(
+      parser.CLASS + "=org.my.Class",
+      parser.MASTER + "=foo",
+      parser.DEPLOY_MODE + "=bar");
+
+    List<String> cmd = new SparkSubmitCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
+    assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
+    assertEquals("foo", findArgValue(cmd, parser.MASTER));
+    assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
+  }
+
+  @Test
+  public void testPySparkLauncher() throws Exception {
+    List<String> sparkSubmitArgs = Arrays.asList(
+      SparkSubmitCommandBuilder.PYSPARK_SHELL,
+      "--master=foo",
+      "--deploy-mode=bar");
+
+    Map<String, String> env = new HashMap<String, String>();
+    List<String> cmd = buildCommand(sparkSubmitArgs, env);
+    assertEquals("python", cmd.get(cmd.size() - 1));
+    assertEquals(
+      String.format("\"%s\" \"foo\" \"%s\" \"bar\" \"%s\"",
+        parser.MASTER, parser.DEPLOY_MODE, SparkSubmitCommandBuilder.PYSPARK_SHELL_RESOURCE),
+      env.get("PYSPARK_SUBMIT_ARGS"));
+  }
+
+  @Test
+  public void testPySparkFallback() throws Exception {
+    List<String> sparkSubmitArgs = Arrays.asList(
+      "--master=foo",
+      "--deploy-mode=bar",
+      "script.py",
+      "arg1");
+
+    Map<String, String> env = new HashMap<String, String>();
+    List<String> cmd = buildCommand(sparkSubmitArgs, env);
+
+    assertEquals("foo", findArgValue(cmd, "--master"));
+    assertEquals("bar", findArgValue(cmd, "--deploy-mode"));
+    assertEquals("script.py", cmd.get(cmd.size() - 2));
+    assertEquals("arg1", cmd.get(cmd.size() - 1));
+  }
+
+  private void testCmdBuilder(boolean isDriver) throws Exception {
+    String deployMode = isDriver ? "client" : "cluster";
+
+    SparkSubmitCommandBuilder launcher =
+      new SparkSubmitCommandBuilder(Collections.<String>emptyList());
+    launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
+      System.getProperty("spark.test.home"));
+    launcher.master = "yarn";
+    launcher.deployMode = deployMode;
+    launcher.appResource = "/foo";
+    launcher.appName = "MyApp";
+    launcher.mainClass = "my.Class";
+    launcher.propertiesFile = dummyPropsFile.getAbsolutePath();
+    launcher.appArgs.add("foo");
+    launcher.appArgs.add("bar");
+    launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g");
+    launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver");
+    launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, "-Ddriver -XX:MaxPermSize=256m");
+    launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native");
+    launcher.conf.put("spark.foo", "foo");
+
+    Map<String, String> env = new HashMap<String, String>();
+    List<String> cmd = launcher.buildCommand(env);
+
+    // Checks below are different for driver and non-driver mode.
+
+    if (isDriver) {
+      assertTrue("Driver -Xms should be configured.", cmd.contains("-Xms1g"));
+      assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx1g"));
+    } else {
+      boolean found = false;
+      for (String arg : cmd) {
+        if (arg.startsWith("-Xms") || arg.startsWith("-Xmx")) {
+          found = true;
+          break;
+        }
+      }
+      assertFalse("Memory arguments should not be set.", found);
+    }
+
+    for (String arg : cmd) {
+      if (arg.startsWith("-XX:MaxPermSize=")) {
+        if (isDriver) {
+          assertEquals("-XX:MaxPermSize=256m", arg);
+        } else {
+          assertEquals("-XX:MaxPermSize=128m", arg);
+        }
+      }
+    }
+
+    String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator));
+    if (isDriver) {
+      assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp));
+    } else {
+      assertFalse("Driver classpath should not be in command.", contains("/driver", cp));
+    }
+
+    String libPath = env.get(CommandBuilderUtils.getLibPathEnvName());
+    if (isDriver) {
+      assertNotNull("Native library path should be set.", libPath);
+      assertTrue("Native library path should contain provided entry.",
+        contains("/native", libPath.split(Pattern.quote(File.pathSeparator))));
+    } else {
+      assertNull("Native library should not be set.", libPath);
+    }
+
+    // Checks below are the same for both driver and non-driver mode.
+    assertEquals(dummyPropsFile.getAbsolutePath(), findArgValue(cmd, parser.PROPERTIES_FILE));
+    assertEquals("yarn", findArgValue(cmd, parser.MASTER));
+    assertEquals(deployMode, findArgValue(cmd, parser.DEPLOY_MODE));
+    assertEquals("my.Class", findArgValue(cmd, parser.CLASS));
+    assertEquals("MyApp", findArgValue(cmd, parser.NAME));
+
+    boolean appArgsOk = false;
+    for (int i = 0; i < cmd.size(); i++) {
+      if (cmd.get(i).equals("/foo")) {
+        assertEquals("foo", cmd.get(i + 1));
+        assertEquals("bar", cmd.get(i + 2));
+        assertEquals(cmd.size(), i + 3);
+        appArgsOk = true;
+        break;
+      }
+    }
+    assertTrue("App resource and args should be added to command.", appArgsOk);
+
+    Map<String, String> conf = parseConf(cmd, parser);
+    assertEquals("foo", conf.get("spark.foo"));
+  }
+
+  private boolean contains(String needle, String[] haystack) {
+    for (String entry : haystack) {
+      if (entry.equals(needle)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private Map<String, String> parseConf(List<String> cmd, SparkSubmitOptionParser parser) {
+    Map<String, String> conf = new HashMap<String, String>();
+    for (int i = 0; i < cmd.size(); i++) {
+      if (cmd.get(i).equals(parser.CONF)) {
+        String[] val = cmd.get(i + 1).split("=", 2);
+        conf.put(val[0], val[1]);
+        i += 1;
+      }
+    }
+    return conf;
+  }
+
+  private String findArgValue(List<String> cmd, String name) {
+    for (int i = 0; i < cmd.size(); i++) {
+      if (cmd.get(i).equals(name)) {
+        return cmd.get(i + 1);
+      }
+    }
+    fail(String.format("arg '%s' not found", name));
+    return null;
+  }
+
+  private boolean findInStringList(String list, String sep, String needle) {
+    return contains(needle, list.split(sep));
+  }
+
+  private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
+    SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
+    builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home"));
+    return builder.buildCommand(env);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message