beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [2/3] beam git commit: [BEAM-1405] Skip stopping context when spark context is provided
Date Tue, 07 Feb 2017 15:01:41 GMT
[BEAM-1405] Skip stopping context when spark context is provided


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5edcdff9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5edcdff9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5edcdff9

Branch: refs/heads/master
Commit: 5edcdff9ac6aee552cb95fe46f32a9f57dd813e1
Parents: 92707b9
Author: Ismaël Mejía <iemejia@gmail.com>
Authored: Tue Feb 7 14:43:11 2017 +0100
Committer: Ismaël Mejía <iemejia@gmail.com>
Committed: Tue Feb 7 14:47:04 2017 +0100

----------------------------------------------------------------------
 .../runners/spark/translation/SparkContextFactory.java   | 11 ++++++++---
 .../beam/runners/spark/ProvidedSparkContextTest.java     |  2 ++
 2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5edcdff9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 67839a8..326838a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -40,17 +40,21 @@ public final class SparkContextFactory {
    */
   static final String TEST_REUSE_SPARK_CONTEXT = "beam.spark.test.reuseSparkContext";
 
+  // Spark allows only one context for JVM so this can be static.
   private static JavaSparkContext sparkContext;
   private static String sparkMaster;
+  private static boolean usesProvidedSparkContext;
 
   private SparkContextFactory() {
   }
 
   public static synchronized JavaSparkContext getSparkContext(SparkPipelineOptions options)
{
     SparkContextOptions contextOptions = options.as(SparkContextOptions.class);
+    usesProvidedSparkContext = contextOptions.getUsesProvidedSparkContext();
     // reuse should be ignored if the context is provided.
     if (Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)
-        && !contextOptions.getUsesProvidedSparkContext()) {
+        && !usesProvidedSparkContext) {
+
       // if the context is null or stopped for some reason, re-create it.
       if (sparkContext == null || sparkContext.sc().isStopped()) {
         sparkContext = createSparkContext(contextOptions);
@@ -67,13 +71,14 @@ public final class SparkContextFactory {
   }
 
   public static synchronized void stopSparkContext(JavaSparkContext context) {
-    if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
+    if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)
+            && !usesProvidedSparkContext) {
       context.stop();
     }
   }
 
   private static JavaSparkContext createSparkContext(SparkContextOptions contextOptions)
{
-    if (contextOptions.getUsesProvidedSparkContext()) {
+    if (usesProvidedSparkContext) {
       LOG.info("Using a provided Spark Context");
       JavaSparkContext jsc = contextOptions.getProvidedSparkContext();
       if (jsc == null || jsc.sc().isStopped()){

http://git-wip-us.apache.org/repos/asf/beam/blob/5edcdff9/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index 00c894d..a4190a9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -57,6 +57,8 @@ public class ProvidedSparkContextTest {
     public void testWithProvidedContext() throws Exception {
         JavaSparkContext jsc = new JavaSparkContext("local[*]", "Existing_Context");
         testWithValidProvidedContext(jsc);
+        // A provided context must not be stopped after execution
+        assertFalse(jsc.sc().isStopped());
         jsc.stop();
     }
 


Mime
View raw message