nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taegeo...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-237] Refactor ParentTaskDataFetcher to emit streaming data and watermark (#138)
Date Thu, 01 Nov 2018 00:32:47 GMT
This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 53522c6  [NEMO-237] Refactor ParentTaskDataFetcher to emit streaming data and watermark
(#138)
53522c6 is described below

commit 53522c6c0faaef5668eb29420ab3ca1fc0b2fa77
Author: John Yang <johnyangk@gmail.com>
AuthorDate: Thu Nov 1 09:32:42 2018 +0900

    [NEMO-237] Refactor ParentTaskDataFetcher to emit streaming data and watermark (#138)
    
    JIRA: [NEMO-237: Refactor ParentTaskDataFetcher to emit streaming data and watermark](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-237)
    
    **Major changes:**
    - Introduces `MultiThreadParentTaskDataFetcher` that does not block, and consumes N iterables
concurrently
    - Make 'Pipe' edges use `MultiThreadParentTaskDataFetcher`
    
    **Minor changes to note:**
    - Fixs bugs in TaskExecutor polling logic (thanks to @taegeonum)
    
    **Tests for the changes:**
    - Streaming tests `WindowedWordCountITCase` use the added `MultiThreadParentTaskDataFetcher`
    
    **Other comments:**
    - Will handle watermarks in a different PR (after the OutputWriters are fixed)
    
    Closes #138
---
 .../task/MultiThreadParentTaskDataFetcher.java     | 150 +++++++++++++++++++++
 .../executor/task/ParentTaskDataFetcher.java       |   1 -
 .../nemo/runtime/executor/task/TaskExecutor.java   |  38 ++++--
 3 files changed, 176 insertions(+), 13 deletions(-)

diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
new file mode 100644
index 0000000..a9b0da3
--- /dev/null
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.executor.task;
+
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.runtime.executor.data.DataUtil;
+import org.apache.nemo.runtime.executor.datatransfer.InputReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.*;
+
+/**
+ * Task thread -> fetchDataElement() -> (((QUEUE))) <- List of iterators <- queueInsertionThreads
+ *
+ * Unlike {@link ParentTaskDataFetcher}, where the task thread directly consumes (and blocks
on) iterators one by one,
+ * this class spawns threads that each forwards elements from an iterator to a global queue.
+ *
+ * This class should be used when dealing with unbounded data streams, as we do not want
to be blocked on a
+ * single unbounded iterator forever.
+ */
+@NotThreadSafe
+class MultiThreadParentTaskDataFetcher extends DataFetcher {
+  private static final Logger LOG = LoggerFactory.getLogger(MultiThreadParentTaskDataFetcher.class);
+
+  private final InputReader readersForParentTask;
+  private final ExecutorService queueInsertionThreads;
+
+  // Non-finals (lazy fetching)
+  private boolean firstFetch = true;
+
+  private final ConcurrentLinkedQueue elementQueue;
+
+  private long serBytes = 0;
+  private long encodedBytes = 0;
+
+  private int numOfIterators;
+  private int numOfFinishMarks = 0;
+
+  MultiThreadParentTaskDataFetcher(final IRVertex dataSource,
+                                   final InputReader readerForParentTask,
+                                   final OutputCollector outputCollector) {
+    super(dataSource, outputCollector);
+    this.readersForParentTask = readerForParentTask;
+    this.firstFetch = true;
+    this.elementQueue = new ConcurrentLinkedQueue();
+    this.queueInsertionThreads = Executors.newCachedThreadPool();
+  }
+
+  @Override
+  Object fetchDataElement() throws IOException, NoSuchElementException {
+    if (firstFetch) {
+      fetchDataLazily();
+      firstFetch = false;
+    }
+
+    while (true) {
+      final Object element = elementQueue.poll();
+      if (element == null) {
+        throw new NoSuchElementException();
+      } else if (element instanceof Finishmark) {
+        numOfFinishMarks++;
+        if (numOfFinishMarks == numOfIterators) {
+          return Finishmark.getInstance();
+        }
+        // else try again.
+      } else {
+        return element;
+      }
+    }
+  }
+
+  private void fetchDataLazily() {
+    final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures = readersForParentTask.read();
+    numOfIterators = futures.size();
+
+    futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) ->
{
+      // A thread for each iterator
+      queueInsertionThreads.submit(() -> {
+        if (exception == null) {
+          // Consume this iterator to the end.
+          while (iterator.hasNext()) { // blocked on the iterator.
+            final Object element = iterator.next();
+            elementQueue.offer(element);
+          }
+
+          // This iterator is finished.
+          countBytesSynchronized(iterator);
+          elementQueue.offer(Finishmark.getInstance());
+        } else {
+          exception.printStackTrace();
+          throw new RuntimeException(exception);
+        }
+      });
+
+    }));
+  }
+
+  final long getSerializedBytes() {
+    return serBytes;
+  }
+
+  final long getEncodedBytes() {
+    return encodedBytes;
+  }
+
+  private synchronized void countBytesSynchronized(final DataUtil.IteratorWithNumBytes iterator)
{
+    try {
+      serBytes += iterator.getNumSerializedBytes();
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+      serBytes = -1;
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of serialized data - the data is not ready
yet ", e);
+    }
+    try {
+      encodedBytes += iterator.getNumEncodedBytes();
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+      encodedBytes = -1;
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of encoded data - the data is not ready
yet ", e);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    queueInsertionThreads.shutdown();
+  }
+}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index e546ee7..80bbea2 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -69,7 +69,6 @@ class ParentTaskDataFetcher extends DataFetcher {
       }
 
       while (true) {
-
         // This iterator has the element
         if (this.currentIterator.hasNext()) {
           return this.currentIterator.next();
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index a2da4bc..1541792 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -229,9 +229,18 @@ public final class TaskExecutor {
       nonBroadcastInEdges.removeAll(broadcastInEdges);
       final List<InputReader> nonBroadcastReaders =
         getParentTaskReaders(taskIndex, nonBroadcastInEdges, intermediateDataIOFactory);
-      nonBroadcastReaders.forEach(parentTaskReader -> nonBroadcastDataFetcherList.add(
-        new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
-          new DataFetcherOutputCollector((OperatorVertex) irVertex))));
+      nonBroadcastReaders.forEach(parentTaskReader -> {
+        final DataFetcher dataFetcher;
+        if (parentTaskReader instanceof PipeInputReader) {
+          nonBroadcastDataFetcherList.add(
+            new MultiThreadParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
+              new DataFetcherOutputCollector((OperatorVertex) irVertex)));
+        } else {
+          nonBroadcastDataFetcherList.add(
+            new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
+              new DataFetcherOutputCollector((OperatorVertex) irVertex)));
+        }
+      });
     });
 
     final List<VertexHarness> sortedHarnessList = irVertexDag.getTopologicalSort()
@@ -317,11 +326,9 @@ public final class TaskExecutor {
    * If the element is an instance of Finishmark, we remove the dataFetcher from the current
list.
    * @param element element
    * @param dataFetcher current data fetcher
-   * @param dataFetchers current list
    */
   private void handleElement(final Object element,
-                             final DataFetcher dataFetcher,
-                             final List<DataFetcher> dataFetchers) {
+                             final DataFetcher dataFetcher) {
     if (element instanceof Finishmark) {
       // We've consumed all the data from this data fetcher.
       if (dataFetcher instanceof SourceVertexDataFetcher) {
@@ -329,10 +336,10 @@ public final class TaskExecutor {
       } else if (dataFetcher instanceof ParentTaskDataFetcher) {
         serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
         encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
+      } else if (dataFetcher instanceof MultiThreadParentTaskDataFetcher) {
+        serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes();
+        encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes();
       }
-
-      // remove current data fetcher from the list
-      dataFetchers.remove(dataFetcher);
     } else if (element instanceof Watermark) {
       // Watermark
       processWatermark(dataFetcher.getOutputCollector(), (Watermark) element);
@@ -388,7 +395,11 @@ public final class TaskExecutor {
       while (availableIterator.hasNext()) {
         final DataFetcher dataFetcher = availableIterator.next();
         try {
-          handleElement(dataFetcher.fetchDataElement(), dataFetcher, availableFetchers);
+          final Object element = dataFetcher.fetchDataElement();
+          handleElement(element, dataFetcher);
+          if (element instanceof Finishmark) {
+            availableIterator.remove();
+          }
         } catch (final NoSuchElementException e) {
           // No element in current data fetcher, fetch data from next fetcher
           // move current data fetcher to pending.
@@ -412,12 +423,15 @@ public final class TaskExecutor {
 
         final DataFetcher dataFetcher = pendingIterator.next();
         try {
-          handleElement(dataFetcher.fetchDataElement(), dataFetcher, pendingFetchers);
+          final Object element = dataFetcher.fetchDataElement();
+          handleElement(element, dataFetcher);
 
           // We processed data. This means the data fetcher is now available.
           // Add current data fetcher to available
           pendingIterator.remove();
-          availableFetchers.add(dataFetcher);
+          if (!(element instanceof Finishmark)) {
+            availableFetchers.add(dataFetcher);
+          }
 
         } catch (final NoSuchElementException e) {
           // The current data fetcher is still pending.. try next data fetcher


Mime
View raw message