pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1710718 - in /pig/branches/spark/src: docs/src/documentation/content/xdocs/start.xml org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Date Tue, 27 Oct 2015 03:40:14 GMT
Author: xuefu
Date: Tue Oct 27 03:40:14 2015
New Revision: 1710718

URL: http://svn.apache.org/viewvc?rev=1710718&view=rev
Log:
PIG-4698: Enable dynamic resource allocation/de-allocation on Yarn backends (Srikanth via
Xuefu)

Modified:
    pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1710718&r1=1710717&r2=1710718&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Tue Oct 27 03:40:14
2015
@@ -140,7 +140,7 @@ Test the Pig installation with this simp
 </li>
 <li><strong>Tez Mode</strong> - To run Pig in Tez mode, you need access
to a Hadoop cluster and HDFS installation. Specify Tez mode using the -x flag (-x tez).
 </li>
-<li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access
to a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using the -x flag
(-x spark). In Spark execution mode, it is necessary to set env::SPARK_MASTER to an appropriate
value (local - local mode, yarn-client - yarn-client mode, mesos://host:port - spark on mesos
or spark://host:port - spark cluster. For more information refer to spark documentation on
Master Urls, <em>yarn-cluster mode is currently not supported</em>)
+<li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access
to a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using the -x flag
(-x spark). In Spark execution mode, it is necessary to set env::SPARK_MASTER to an appropriate
value (local - local mode, yarn-client - yarn-client mode, mesos://host:port - spark on mesos
or spark://host:port - spark cluster. For more information refer to spark documentation on
Master Urls, <em>yarn-cluster mode is currently not supported</em>). Pig scripts
run on Spark can take advantage of the <a href="http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation">dynamic
allocation</a> feature. The feature can be enabled by simply enabling <em>spark.dynamicAllocation.enabled</em>.
Refer to spark <a href="http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation">configuration</a>
for additional configuration details. In general all properties in the pig script prefixed
with
  <em>spark.</em> are copied to the Spark Application Configuration. Please note
that Yarn auxillary service need to be enabled on Spark for this to work. See Spark documentation
for additional details.
 </li>
 </ul>
 <p></p>

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1710718&r1=1710717&r2=1710718&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Tue Oct 27 03:40:14 2015
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
 
@@ -110,6 +111,7 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
 import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
+import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.scheduler.JobLogger;
@@ -377,7 +379,7 @@ public class SparkLauncher extends Launc
                     Paths.get(localFile.getAbsolutePath()));
         } else {
             sparkContext.addFile(jarFile.toURI().toURL()
-                    .toExternalForm());
+                .toExternalForm());
         }
     }
 
@@ -454,14 +456,50 @@ public class SparkLauncher extends Launc
                 }
             }
 
-            sparkContext = new JavaSparkContext(master, "PigOnSpark", sparkHome,
-                    jars.toArray(new String[jars.size()]));
+            SparkConf sparkConf = new SparkConf();
+            Properties pigCtxtProperties = pc.getProperties();
+
+            sparkConf.setMaster(master);
+            sparkConf.setAppName("PigOnSpark:" + pigCtxtProperties.getProperty(PigContext.JOB_NAME));
+            sparkConf.setJars(jars.toArray(new String[jars.size()]));
+            if (sparkHome != null && !sparkHome.isEmpty()) {
+                sparkConf.setSparkHome(sparkHome);
+            } else {
+                LOG.warn("SPARK_HOME is not set");
+            }
+
+            //Copy all spark.* properties to SparkConf
+            for (String key : pigCtxtProperties.stringPropertyNames()) {
+                if (key.startsWith("spark.")) {
+                    LOG.debug("Copying key " + key + " with value " +
+                        pigCtxtProperties.getProperty(key) + " to SparkConf");
+                    sparkConf.set(key, pigCtxtProperties.getProperty(key));
+                }
+            }
+
+            checkAndConfigureDynamicAllocation(master, sparkConf);
+
+            sparkContext = new JavaSparkContext(sparkConf);
             sparkContext.sc().addSparkListener(new StatsReportListener());
             sparkContext.sc().addSparkListener(new JobLogger());
             sparkContext.sc().addSparkListener(jobMetricsListener);
         }
     }
 
+    private static void checkAndConfigureDynamicAllocation(String master, SparkConf sparkConf)
{
+        if (sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)) {
+            if (!master.startsWith("yarn")) {
+                LOG.warn("Dynamic allocation is enabled, but " +
+                    "script isn't running on yarn. Ignoring ...");
+            }
+            if (!sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {
+                LOG.info("Spark shuffle service is being enabled as dynamic " +
+                    "allocation is enabled");
+                sparkConf.set("spark.shuffle.service.enabled", "true");
+            }
+        }
+    }
+
     // You can use this in unit tests to stop the SparkContext between tests.
     static void stopSpark() {
         if (sparkContext != null) {



Mime
View raw message