beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [2/9] beam git commit: Advance watermarks onBatchCompleted hook.
Date Mon, 20 Feb 2017 09:57:58 GMT
Advance watermarks onBatchCompleted hook.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa31f18e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa31f18e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa31f18e

Branch: refs/heads/master
Commit: fa31f18e489d4cbe44fe4a9be7ba3d7dbee7c354
Parents: bbf3744
Author: Sela <ansela@paypal.com>
Authored: Sun Feb 12 18:31:14 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Mon Feb 20 11:30:14 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/spark/SparkRunner.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fa31f18e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index ebac375..52a080b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -39,6 +39,7 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
 import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
@@ -191,12 +192,15 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult>
{
           new JavaStreamingListenerWrapper(
               new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
 
-      // register listeners.
+      // register user-defined listeners.
       for (JavaStreamingListener listener: mOptions.as(SparkContextOptions.class).getListeners())
{
         LOG.info("Registered listener {}." + listener.getClass().getSimpleName());
         jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
       }
 
+      // register Watermarks listener to broadcast the advanced WMs.
+      jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener(jssc)));
+
       startPipeline = executorService.submit(new Runnable() {
 
         @Override


Mime
View raw message