beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] beam git commit: [BEAM-1437] Spark runner StreamingListeners are not recoverable.
Date Fri, 10 Feb 2017 15:45:30 GMT
Repository: beam
Updated Branches:
  refs/heads/master 712a1d6e3 -> 9c4a784bb


[BEAM-1437] Spark runner StreamingListeners are not recoverable.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d757dc1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d757dc1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d757dc1

Branch: refs/heads/master
Commit: 4d757dc19bc8d6af39a0a6bfb0e117e86b55ae74
Parents: 712a1d6
Author: Aviem Zur <aviemzur@gmail.com>
Authored: Thu Feb 9 19:40:31 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Fri Feb 10 17:26:11 2017 +0200

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/spark/SparkRunner.java     | 7 +++++++
 .../streaming/SparkRunnerStreamingContextFactory.java       | 9 ---------
 2 files changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4d757dc1/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 578ed21..46492f8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -57,6 +57,7 @@ import org.apache.spark.SparkEnv$;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.metrics.MetricsSystem;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
 import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -174,6 +175,12 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
           new JavaStreamingListenerWrapper(
               new AccumulatorSingleton.AccumulatorCheckpointingSparkListener()));
 
+      // register listeners.
+      for (JavaStreamingListener listener: mOptions.as(SparkContextOptions.class).getListeners())
{
+        LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
+        jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
+      }
+
       startPipeline = executorService.submit(new Runnable() {
 
         @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/4d757dc1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index 6d254e1..b461856 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.spark.translation.streaming;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
-import org.apache.beam.runners.spark.SparkContextOptions;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
@@ -35,8 +34,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
-import org.apache.spark.streaming.api.java.JavaStreamingListener;
-import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,12 +84,6 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF
 
     checkpoint(jssc);
 
-    // register listeners.
-    for (JavaStreamingListener listener: options.as(SparkContextOptions.class).getListeners())
{
-      LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
-      jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
-    }
-
     return jssc;
   }
 


Mime
View raw message