beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ieme...@apache.org
Subject [1/2] beam git commit: [BEAM-3206] Shut down executor when spark runner finishes
Date Fri, 17 Nov 2017 14:16:26 GMT
Repository: beam
Updated Branches:
  refs/heads/master 0df7ba9d6 -> f10399d7c


[BEAM-3206] Shut down executor when spark runner finishes

The Spark runner previously left the JVM process hanging after
completion because its one-time use executor service was never shut
down. This change shuts down the executor after jobs have been
submitted, allowing graceful JVM termination.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b5b27333
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b5b27333
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b5b27333

Branch: refs/heads/master
Commit: b5b2733338e7a0d5dd373b7a19bea315b3b1c692
Parents: 0df7ba9
Author: Ben Sidhom <sidhom@google.com>
Authored: Wed Nov 15 16:05:49 2017 -0800
Committer: Ismaël Mejía <iemejia@gmail.com>
Committed: Fri Nov 17 15:15:08 2017 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/runners/spark/SparkRunner.java   | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b5b27333/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 98ca1be..4a409cb 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -190,6 +190,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
                   jssc.start();
                 }
               });
+      executorService.shutdown();
 
       result = new SparkPipelineResult.StreamingMode(startPipeline, jssc);
     } else {
@@ -214,6 +215,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
                   LOG.info("Batch pipeline execution complete.");
                 }
               });
+      executorService.shutdown();
 
       result = new SparkPipelineResult.BatchMode(startPipeline, jsc);
     }


Mime
View raw message