beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [1/5] incubator-beam git commit: [BEAM-619] keep track of local split sources in UnboundedSourceWrapper
Date Fri, 09 Sep 2016 14:12:27 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master a96ea98a4 -> b6205ffa3


[BEAM-619] keep track of local split sources in UnboundedSourceWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/145ad47d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/145ad47d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/145ad47d

Branch: refs/heads/master
Commit: 145ad47d9f945f816be7a91001cdf7cb3b6a7fac
Parents: be689df
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Sep 7 13:07:15 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Sep 7 13:15:54 2016 +0200

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    | 79 +++++++++++---------
 1 file changed, 43 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/145ad47d/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 8647322..2cd06ed 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -58,7 +58,7 @@ public class UnboundedSourceWrapper<
   private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
 
   /**
-   * Keep the options so that we can initialize the readers.
+   * Keep the options so that we can initialize the localReaders.
    */
   private final SerializedPipelineOptions serializedOptions;
 
@@ -72,13 +72,19 @@ public class UnboundedSourceWrapper<
    * The split sources. We split them in the constructor to ensure that all parallel
    * sources are consistent about the split sources.
    */
-  private List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
+  private final List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
 
   /**
+   * The local split sources. Assigned at runtime when the wrapper is executed in parallel.
+   */
+  private transient List<UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources;
+
+  /**
+   * The local split readers. Assigned at runtime when the wrapper is executed in parallel.
    * Make it a field so that we can access it in {@link #trigger(long)} for
    * emitting watermarks.
    */
-  private transient List<UnboundedSource.UnboundedReader<OutputT>> readers;
+  private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders;
 
   /**
    * Initialize here and not in run() to prevent races where we cancel a job before run()
is
@@ -149,26 +155,15 @@ public class UnboundedSourceWrapper<
     int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
     int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
 
-    List<UnboundedSource<OutputT, CheckpointMarkT>> localSources = new ArrayList<>();
-
-    for (int i = 0; i < splitSources.size(); i++) {
-      if (i % numSubtasks == subtaskIndex) {
-        localSources.add(splitSources.get(i));
-      }
-    }
+    localSplitSources = new ArrayList<>();
+    localReaders = new ArrayList<>();
 
-    LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
-        subtaskIndex,
-        numSubtasks,
-        localSources);
-
-    readers = new ArrayList<>();
     if (restoredState != null) {
 
       // restore the splitSources from the checkpoint to ensure consistent ordering
       // do it using a transform because otherwise we would have to do
       // unchecked casts
-      splitSources = Lists.transform(
+      localSplitSources = Lists.transform(
           restoredState,
           new Function<
               KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>,
@@ -182,19 +177,31 @@ public class UnboundedSourceWrapper<
 
       for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>
restored:
           restoredState) {
-        readers.add(
+        localReaders.add(
             restored.getKey().createReader(
                 serializedOptions.getPipelineOptions(), restored.getValue()));
       }
       restoredState = null;
     } else {
-      // initialize readers from scratch
-      for (UnboundedSource<OutputT, CheckpointMarkT> source : localSources) {
-        readers.add(source.createReader(serializedOptions.getPipelineOptions(), null));
+      // initialize localReaders and localSources from scratch
+      for (int i = 0; i < splitSources.size(); i++) {
+        if (i % numSubtasks == subtaskIndex) {
+          UnboundedSource<OutputT, CheckpointMarkT> source =
+              splitSources.get(i);
+          UnboundedSource.UnboundedReader<OutputT> reader =
+              source.createReader(serializedOptions.getPipelineOptions(), null);
+          localSplitSources.add(source);
+          localReaders.add(reader);
+        }
       }
     }
 
-    if (readers.size() == 0) {
+    LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
+        subtaskIndex,
+        numSubtasks,
+        localSplitSources);
+
+    if (localReaders.size() == 0) {
       // do nothing, but still look busy ...
       // also, output a Long.MAX_VALUE watermark since we know that we're not
       // going to emit anything
@@ -218,9 +225,9 @@ public class UnboundedSourceWrapper<
           }
         }
       }
-    } else if (readers.size() == 1) {
+    } else if (localReaders.size() == 1) {
       // the easy case, we just read from one reader
-      UnboundedSource.UnboundedReader<OutputT> reader = readers.get(0);
+      UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
 
       boolean dataAvailable = reader.start();
       if (dataAvailable) {
@@ -239,25 +246,25 @@ public class UnboundedSourceWrapper<
         }
       }
     } else {
-      // a bit more complicated, we are responsible for several readers
+      // a bit more complicated, we are responsible for several localReaders
       // loop through them and sleep if none of them had any data
 
-      int numReaders = readers.size();
+      int numReaders = localReaders.size();
       int currentReader = 0;
 
       // start each reader and emit data if immediately available
-      for (UnboundedSource.UnboundedReader<OutputT> reader : readers) {
+      for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
         boolean dataAvailable = reader.start();
         if (dataAvailable) {
           emitElement(ctx, reader);
         }
       }
 
-      // a flag telling us whether any of the readers had data
+      // a flag telling us whether any of the localReaders had data
       // if no reader had data, sleep for bit
       boolean hadData = false;
       while (isRunning) {
-        UnboundedSource.UnboundedReader<OutputT> reader = readers.get(currentReader);
+        UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
         boolean dataAvailable = reader.advance();
 
         if (dataAvailable) {
@@ -298,8 +305,8 @@ public class UnboundedSourceWrapper<
   @Override
   public void close() throws Exception {
     super.close();
-    if (readers != null) {
-      for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+    if (localReaders != null) {
+      for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
         reader.close();
       }
     }
@@ -324,9 +331,9 @@ public class UnboundedSourceWrapper<
     List<KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>>
checkpoints =
         new ArrayList<>();
 
-    for (int i = 0; i < splitSources.size(); i++) {
-      UnboundedSource<OutputT, CheckpointMarkT> source = splitSources.get(i);
-      UnboundedSource.UnboundedReader<OutputT> reader = readers.get(i);
+    for (int i = 0; i < localSplitSources.size(); i++) {
+      UnboundedSource<OutputT, CheckpointMarkT> source = localSplitSources.get(i);
+      UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(i);
 
       @SuppressWarnings("unchecked")
       CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
@@ -357,9 +364,9 @@ public class UnboundedSourceWrapper<
   public void trigger(long timestamp) throws Exception {
     if (this.isRunning) {
       synchronized (context.getCheckpointLock()) {
-        // find minimum watermark over all readers
+        // find minimum watermark over all localReaders
         long watermarkMillis = Long.MAX_VALUE;
-        for (UnboundedSource.UnboundedReader<OutputT> reader: readers) {
+        for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
           Instant watermark = reader.getWatermark();
           if (watermark != null) {
             watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);


Mime
View raw message