nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] branch 717-TGE updated: address comments - remove unnecessary local variables
Date Tue, 08 May 2018 13:29:29 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/717-TGE by this push:
     new d667589  address comments - remove unnecessary local variables
d667589 is described below

commit d667589e7989eb1fd827f28642e0587b52f01805
Author: Jeongyoon Eo <jeongyoon0807@gmail.com>
AuthorDate: Tue May 8 22:29:19 2018 +0900

    address comments - remove unnecessary local variables
---
 .../snu/nemo/examples/beam/MapReduceITCase.java    |   7 +-
 .../nemo/runtime/executor/TaskGroupExecutor.java   | 167 +++++++++------------
 .../executor/datatransfer/TaskDataHandler.java     | 116 ++++++--------
 3 files changed, 116 insertions(+), 174 deletions(-)

diff --git a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
index 72b1035..5bc8379 100644
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
@@ -51,7 +51,7 @@ public final class MapReduceITCase {
   @After
   public void tearDown() throws Exception {
     ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, testResourceFileName);
-    ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    //ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
   }
 
   @Test (timeout = TIMEOUT)
@@ -62,6 +62,7 @@ public final class MapReduceITCase {
         .addUserArgs(inputFilePath, outputFilePath)
         .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
         .build());
+    ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
   }
 
   @Test (timeout = TIMEOUT)
@@ -72,6 +73,7 @@ public final class MapReduceITCase {
         .addUserArgs(inputFilePath, outputFilePath)
         .addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
         .build());
+    ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
   }
 
   @Test (timeout = TIMEOUT)
@@ -82,6 +84,7 @@ public final class MapReduceITCase {
         .addUserArgs(inputFilePath, outputFilePath)
         .addOptimizationPolicy(DisaggregationPolicyParallelismFive.class.getCanonicalName())
         .build());
+    ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
   }
 
   @Test (timeout = TIMEOUT)
@@ -92,6 +95,7 @@ public final class MapReduceITCase {
         .addUserArgs(inputFilePath, outputFilePath)
         .addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
         .build());
+    ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
   }
 
   /**
@@ -106,5 +110,6 @@ public final class MapReduceITCase {
         .addUserArgs(inputFilePath, outputFilePath)
         .addOptimizationPolicy(DataSkewPolicyParallelismFive.class.getCanonicalName())
         .build());
+    ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
   }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 7595b43..2edc57a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@ -34,6 +34,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,7 +43,6 @@ import org.slf4j.LoggerFactory;
  */
 public final class TaskGroupExecutor {
   private static final Logger LOG = LoggerFactory.getLogger(TaskGroupExecutor.class.getName());
-
   private final DAG<Task, RuntimeEdge<Task>> taskGroupDag;
   private final String taskGroupId;
   private final int taskGroupIdx;
@@ -51,13 +51,8 @@ public final class TaskGroupExecutor {
   private final List<PhysicalStageEdge> stageOutgoingEdges;
   private final DataTransferFactory channelFactory;
   private final MetricCollector metricCollector;
-
-  private final List<InputReader> inputReaders;
   private final Map<InputReader, List<TaskDataHandler>> inputReaderToDataHandlersMap;
-  private final Map<String, Iterator> idToSrcIteratorMap;
-  private final Map<String, List<TaskDataHandler>> srcIteratorIdToDataHandlersMap;
-  private final Map<String, List<TaskDataHandler>> iteratorIdToDataHandlersMap;
-  private final LinkedBlockingQueue<Pair<String, DataUtil.IteratorWithNumBytes>>
partitionQueue;
+  private final LinkedBlockingQueue<Pair<String, DataUtil.IteratorWithNumBytes>>
interStageItrReadyQueue;
   private List<TaskDataHandler> taskDataHandlers;
   private Map<OutputCollectorImpl, List<TaskDataHandler>> outputToChildrenDataHandlersMap;
   private final Set<String> finishedTaskIds;
@@ -98,23 +93,15 @@ public final class TaskGroupExecutor {
     this.metricCollector = new MetricCollector(metricMessageSender);
     this.logicalTaskIdPutOnHold = null;
     this.isExecutionRequested = false;
-
-    this.inputReaders = new ArrayList<>();
     this.inputReaderToDataHandlersMap = new ConcurrentHashMap<>();
-    this.idToSrcIteratorMap = new HashMap<>();
-    this.srcIteratorIdToDataHandlersMap = new HashMap<>();
-    this.iteratorIdToDataHandlersMap = new ConcurrentHashMap<>();
-    this.partitionQueue = new LinkedBlockingQueue<>();
+    this.interStageItrReadyQueue = new LinkedBlockingQueue<>();
     this.outputToChildrenDataHandlersMap = new HashMap<>();
     this.taskDataHandlers = new ArrayList<>();
-
     this.finishedTaskIds = new HashSet<>();
     this.numPartitions = 0;
-
     this.serBlockSize = 0;
     this.encodedBlockSize = 0;
 
-
     initialize();
   }
 
@@ -133,48 +120,45 @@ public final class TaskGroupExecutor {
     // Initialize data handlers for each Task.
     taskGroupDag.topologicalDo(task -> taskDataHandlers.add(new TaskDataHandler(task)));
 
-    // Initialize data transfer.
-    // Construct a pointer-based DAG of TaskDataHandlers that are used for data transfer.
-    // 'Pointer-based' means that it isn't Map/List-based in getting the data structure or
parent/children
-    // to avoid element-wise extra overhead of calculating hash values(HashMap) or iterating
Lists.
+    // Initialize data transfer by constructing a DAG of TaskDataHandlers.
+    // Referencing this DAG doesn't require element-wise extra overhead of calculating hash
values(HashMap)
+    // or iterating Lists.
     taskGroupDag.topologicalDo(task -> {
       final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(task);
       final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(task);
       final TaskDataHandler dataHandler = getTaskDataHandler(task);
 
       // Set data handlers of children tasks.
-      // This forms a pointer-based DAG of TaskDataHandlers.
       final List<TaskDataHandler> childrenDataHandlers = new ArrayList<>();
       taskGroupDag.getChildren(task.getId()).forEach(child ->
           childrenDataHandlers.add(getTaskDataHandler(child)));
       dataHandler.setChildrenDataHandler(childrenDataHandlers);
 
-      // Add InputReaders for inter-stage data transfer
+      // Add InputReaders for inter-stage data transfer.
       inEdgesFromOtherStages.forEach(physicalStageEdge -> {
         final InputReader inputReader = channelFactory.createReader(
             taskGroupIdx, physicalStageEdge.getSrcVertex(), physicalStageEdge);
 
         // For InputReaders that have side input, collect them separately.
         if (inputReader.isSideInputReader()) {
-          dataHandler.addSideInputFromOtherStages(inputReader);
+          dataHandler.addInterStageSideInput(inputReader);
         } else {
-          inputReaders.add(inputReader);
           inputReaderToDataHandlersMap.putIfAbsent(inputReader, new ArrayList<>());
           inputReaderToDataHandlersMap.get(inputReader).add(dataHandler);
         }
       });
 
-      // Add OutputWriters for inter-stage data transfer
+      // Add OutputWriters for inter-stage data transfer.
       outEdgesToOtherStages.forEach(physicalStageEdge -> {
         final OutputWriter outputWriter = channelFactory.createWriter(
             task, taskGroupIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
         dataHandler.addOutputWriters(outputWriter);
       });
 
-      // Add InputPipes for intra-stage data transfer
-      addInputFromThisStage(task, dataHandler);
+      // Set task input to be processed in intra-stage data transfer.
+      setIntraStageInput(task);
 
-      // Add OutputPipe for intra-stage data transfer
+      // Set a place to emit task output in intra-stage data transfer.
       setOutputCollector(task, dataHandler);
     });
 
@@ -185,11 +169,11 @@ public final class TaskGroupExecutor {
         final Map<Transform, Object> sideInputMap = new HashMap<>();
         final TaskDataHandler dataHandler = getTaskDataHandler(task);
         // Check and collect side inputs.
-        if (!dataHandler.getSideInputFromOtherStages().isEmpty()) {
-          sideInputFromOtherStages(task, sideInputMap);
+        if (!dataHandler.getInterStageSideInput().isEmpty()) {
+          setInterStageSideInput(task, sideInputMap);
         }
-        if (!dataHandler.getSideInputFromThisStage().isEmpty()) {
-          sideInputFromThisStage(task, sideInputMap);
+        if (!dataHandler.getIntraStageSideInput().isEmpty()) {
+          setIntraStageSideInput(task, sideInputMap);
         }
 
         final Transform.Context transformContext = new ContextImpl(sideInputMap);
@@ -218,27 +202,28 @@ public final class TaskGroupExecutor {
    *
    * @param task the Task to add input OutputCollectors to.
    */
-  private void addInputFromThisStage(final Task task, final TaskDataHandler dataHandler)
{
+  private void setIntraStageInput(final Task task) {
     List<Task> parentTasks = taskGroupDag.getParents(task.getId());
     final String physicalTaskId = getPhysicalTaskId(task.getId());
+    final TaskDataHandler dataHandler = getTaskDataHandler(task);
 
     if (parentTasks != null) {
       parentTasks.forEach(parent -> {
         final OutputCollectorImpl parentOutputCollector = getTaskDataHandler(parent).getOutputCollector();
         if (parentOutputCollector.hasSideInputFor(physicalTaskId)) {
-          dataHandler.addSideInputFromThisStage(parentOutputCollector);
+          dataHandler.addIntraStageSideInput(parentOutputCollector);
         } else {
-          dataHandler.addInputFromThisStages(parentOutputCollector);
+          dataHandler.addIntraStageInput(parentOutputCollector);
         }
       });
     }
   }
 
   /**
-   * Add output outputCollectors to each {@link Task}.
-   * Output outputCollector denotes the one and only one outputCollector of this task.
-   * Check the outgoing edges that will use this outputCollector,
-   * and set this outputCollector as side input if any one of the edges uses this outputCollector
as side input.
+   * Add outputCollector to each {@link Task}.
+   * Each task has one and only one OutputCollector.
+   * Check the outgoing edges that will use this OutputCollector, and set it as side input
+   * if any one of the edges uses it as side input.
    *
    * @param task the Task to add output outputCollectors to.
    */
@@ -289,24 +274,21 @@ public final class TaskGroupExecutor {
     taskGroupDag.topologicalDo(task -> {
       if (task instanceof BoundedSourceTask) {
         try {
-          final String iteratorId = generateIteratorId();
           final Iterator iterator = ((BoundedSourceTask) task).getReadable().read().iterator();
-          idToSrcIteratorMap.putIfAbsent(iteratorId, iterator);
-          srcIteratorIdToDataHandlersMap.putIfAbsent(iteratorId, new ArrayList<>());
-          srcIteratorIdToDataHandlersMap.get(iteratorId).add(getTaskDataHandler(task));
+          final TaskDataHandler dataHandler = getTaskDataHandler(task);
+          dataHandler.addIdToSrcItr(generateIteratorId(), iterator);
         } catch (final BlockFetchException ex) {
           taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
               Optional.empty(), Optional.of(TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
-          LOG.info("{} Execution Failed (Recoverable: input read failure)! Exception: {}",
+          LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}",
               taskGroupId, ex.toString());
         } catch (final Exception e) {
           taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
               Optional.empty(), Optional.empty());
-          LOG.info("{} Execution Failed! Exception: {}", taskGroupId, e.toString());
+          LOG.error("{} Execution Failed! Exception: {}", taskGroupId, e.toString());
           throw new RuntimeException(e);
         }
       }
-      // TODO #XXX: Support other types of source tasks, i. e. InitializedSourceTask
     });
   }
 
@@ -316,21 +298,17 @@ public final class TaskGroupExecutor {
       numPartitions += futures.size();
 
       // Add consumers which will push iterator when the futures are complete.
-      futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) ->
{
+      futures.forEach(compFuture -> compFuture.whenComplete((itr, exception) -> {
         if (exception != null) {
           throw new BlockFetchException(exception);
         }
 
-        final String iteratorId = generateIteratorId();
-        if (iteratorIdToDataHandlersMap.containsKey(iteratorId)) {
-          throw new RuntimeException("iteratorIdToDataHandlersMap already contains " + iteratorId);
-        } else {
-          iteratorIdToDataHandlersMap.computeIfAbsent(iteratorId, absentIteratorId ->
dataHandlers);
-          try {
-            partitionQueue.put(Pair.of(iteratorId, iterator));
-          } catch (InterruptedException e) {
-            throw new RuntimeException("Interrupted while receiving iterator " + e);
-          }
+        try {
+          final String itrId = generateIteratorId();
+          interStageItrReadyQueue.put(Pair.of(itrId, itr));
+          dataHandlers.forEach(dataHandler -> dataHandler.addInterStageItrId(itrId));
+        } catch (InterruptedException e) {
+          throw new RuntimeException("Interrupted while receiving iterator " + e);
         }
       }));
     });
@@ -345,14 +323,14 @@ public final class TaskGroupExecutor {
   }
 
   private void initializeOutputToChildrenDataHandlersMap() {
-    srcIteratorIdToDataHandlersMap.values().forEach(dataHandlers ->
-        dataHandlers.forEach(dataHandler -> {
-          outputToChildrenDataHandlersMap.putIfAbsent(dataHandler.getOutputCollector(), dataHandler.getChildren());
-        }));
-    iteratorIdToDataHandlersMap.values().forEach(dataHandlers ->
-        dataHandlers.forEach(dataHandler -> {
-          outputToChildrenDataHandlersMap.putIfAbsent(dataHandler.getOutputCollector(), dataHandler.getChildren());
-        }));
+    taskDataHandlers.forEach(dataHandler -> {
+      // If this task has iterator input to process,
+      // maintain a map of its output and its children.
+      if (!dataHandler.getIdToSrcItrMap().isEmpty()
+          || !dataHandler.getInterStageItrId().isEmpty()) {
+        outputToChildrenDataHandlersMap.putIfAbsent(dataHandler.getOutputCollector(), dataHandler.getChildren());
+      }
+    });
   }
 
   private void updateOutputToChildrenDataHandlersMap() {
@@ -375,8 +353,8 @@ public final class TaskGroupExecutor {
     }
   }
 
-  private void sideInputFromOtherStages(final Task task, final Map<Transform, Object>
sideInputMap) {
-    getTaskDataHandler(task).getSideInputFromOtherStages().forEach(sideInputReader ->
{
+  private void setInterStageSideInput(final Task task, final Map<Transform, Object>
sideInputMap) {
+    getTaskDataHandler(task).getInterStageSideInput().forEach(sideInputReader -> {
       try {
         final DataUtil.IteratorWithNumBytes sideInputIterator = sideInputReader.read().get(0).get();
         final Object sideInput = getSideInput(sideInputIterator);
@@ -406,9 +384,8 @@ public final class TaskGroupExecutor {
     });
   }
 
-  private void sideInputFromThisStage(final Task task, final Map<Transform, Object>
sideInputMap) {
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
-    getTaskDataHandler(task).getSideInputFromThisStage().forEach(input -> {
+  private void setIntraStageSideInput(final Task task, final Map<Transform, Object>
sideInputMap) {
+    getTaskDataHandler(task).getIntraStageSideInput().forEach(input -> {
       // because sideInput is only 1 element in the outputCollector
       Object sideInput = input.remove();
       final RuntimeEdge inEdge = input.getSideInputRuntimeEdge();
@@ -452,26 +429,19 @@ public final class TaskGroupExecutor {
 
     // Execute the TaskGroup DAG.
     try {
-      srcIteratorIdToDataHandlersMap.forEach((srcIteratorId, dataHandlers) -> {
-        Iterator iterator = idToSrcIteratorMap.get(srcIteratorId);
-        iterator.forEachRemaining(element -> {
-          for (final TaskDataHandler dataHandler : dataHandlers) {
-            runTask(dataHandler, element);
-          }
-        });
-      });
+      taskDataHandlers.stream().filter(dataHandler -> !dataHandler.getIdToSrcItrMap().isEmpty())
+          .forEach(dataHandler ->
+              dataHandler.getIdToSrcItrMap().forEach((srcItrId, srcItr) ->
+                  srcItr.forEachRemaining(element -> runTask(dataHandler.getTask(), element))));
 
-      // Process data from other stages.
       for (int currPartition = 0; currPartition < numPartitions; currPartition++) {
-        Pair<String, DataUtil.IteratorWithNumBytes> idToIteratorPair = partitionQueue.take();
+        Pair<String, DataUtil.IteratorWithNumBytes> idToIteratorPair = interStageItrReadyQueue.take();
         final String iteratorId = idToIteratorPair.left();
         final DataUtil.IteratorWithNumBytes iterator = idToIteratorPair.right();
-        List<TaskDataHandler> dataHandlers = iteratorIdToDataHandlersMap.get(iteratorId);
-        iterator.forEachRemaining(element -> {
-          for (final TaskDataHandler dataHandler : dataHandlers) {
-            runTask(dataHandler, element);
-          }
-        });
+
+        taskDataHandlers.stream().filter(dataHandler ->
+            dataHandler.getInterStageItrId().contains(iteratorId)).forEach(dataHandler ->
+            iterator.forEachRemaining(element -> runTask(dataHandler.getTask(), element)));
 
         // Collect metrics on block size if possible.
         try {
@@ -479,14 +449,14 @@ public final class TaskGroupExecutor {
         } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
           serBlockSize = -1;
         } catch (final IllegalStateException e) {
-          LOG.error("IllegalState ", e);
+          LOG.error("Failed to get the number of bytes of serialized data - the data is not
ready yet", e);
         }
         try {
           encodedBlockSize += iterator.getNumEncodedBytes();
         } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
           encodedBlockSize = -1;
         } catch (final IllegalStateException e) {
-          LOG.error("IllegalState ", e);
+          LOG.error("Failed to get the number of bytes of encoded data - the data is not
ready yet", e);
         }
       }
       inputReadEndTime = System.currentTimeMillis();
@@ -513,10 +483,8 @@ public final class TaskGroupExecutor {
             final Object element = outputCollector.remove();
 
             // Pass outputCollectorOwnerTask's output to its children tasks recursively.
-            if (!childrenDataHandlers.isEmpty()) {
-              for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
-                runTask(childDataHandler, element);
-              }
+            for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
+              runTask(childDataHandler.getTask(), element);
             }
 
             // Write element-wise to OutputWriters if any and close the OutputWriters.
@@ -538,12 +506,12 @@ public final class TaskGroupExecutor {
     } catch (final BlockWriteException ex2) {
       taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
           Optional.empty(), Optional.of(TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
-      LOG.info("{} Execution Failed (Recoverable: output write failure)! Exception: {}",
+      LOG.error("{} Execution Failed (Recoverable: output write failure)! Exception: {}",
           taskGroupId, ex2.toString());
     } catch (final Exception e) {
       taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
           Optional.empty(), Optional.empty());
-      LOG.info("{} Execution Failed! Exception: {}",
+      LOG.error("{} Execution Failed! Exception: {}",
           taskGroupId, e.toString());
       throw new RuntimeException(e);
     }
@@ -552,6 +520,7 @@ public final class TaskGroupExecutor {
     final boolean available = serBlockSize >= 0;
     putReadBytesMetric(available, serBlockSize, encodedBlockSize, metric);
     metricCollector.endMeasurement(taskGroupId, metric);
+
     if (logicalTaskIdPutOnHold == null) {
       taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.COMPLETE, Optional.empty(),
Optional.empty());
     } else {
@@ -565,12 +534,12 @@ public final class TaskGroupExecutor {
   /**
    * Processes an OperatorTask.
    *
-   * @param dataHandler TaskDataHandler of a task to execute.
+   * @param task Task to run.
+   * @param dataElement input data element to process.
    */
-  private void runTask(final TaskDataHandler dataHandler, final Object dataElement) {
-    final Task task = dataHandler.getTask();
+  private void runTask(final Task task, final Object dataElement) {
+    final TaskDataHandler dataHandler = getTaskDataHandler(task);
     final OutputCollectorImpl outputCollector = dataHandler.getOutputCollector();
-    final String physicalTaskId = getPhysicalTaskId(task.getId());
 
     // Process element-wise depending on the Task type
     if (task instanceof BoundedSourceTask) {
@@ -603,7 +572,7 @@ public final class TaskGroupExecutor {
       List<TaskDataHandler> childrenDataHandlers = dataHandler.getChildren();
       if (!childrenDataHandlers.isEmpty()) {
         for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
-          runTask(childDataHandler, element);
+          runTask(childDataHandler.getTask(), element);
         }
       }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
index 255fa9e..467b920 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
@@ -16,39 +16,34 @@
 package edu.snu.nemo.runtime.executor.datatransfer;
 
 import edu.snu.nemo.runtime.common.plan.physical.Task;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 /**
  * Per-Task data handler.
- * This is a wrapper class that handles data transfer of a Task.
- * As TaskGroup input is processed element-wise, Task output element percolates down
- * through the DAG of children TaskDataHandlers.
  */
 public final class TaskDataHandler {
-  private static final Logger LOG = LoggerFactory.getLogger(TaskDataHandler.class.getName());
+  private final Task task;
+  private List<TaskDataHandler> children;
+  private final List<OutputCollectorImpl> intraStageInput;
+  private final List<InputReader> interStageSideInput;
+  private final List<OutputCollectorImpl> intraStageSideInput;
+  private OutputCollectorImpl outputCollector;
+  private final List<OutputWriter> outputWriters;
+  private final Map<String, Iterator> idToSrcItrMap;
+  private final List<String> interStageItrId;
 
   public TaskDataHandler(final Task task) {
     this.task = task;
     this.children = new ArrayList<>();
-    this.inputFromThisStage = new ArrayList<>();
-    this.sideInputFromOtherStages = new ArrayList<>();
-    this.sideInputFromThisStage = new ArrayList<>();
+    this.intraStageInput = new ArrayList<>();
+    this.interStageSideInput = new ArrayList<>();
+    this.intraStageSideInput = new ArrayList<>();
+    this.idToSrcItrMap = new HashMap<>();
+    this.interStageItrId = new ArrayList<>();
     this.outputCollector = null;
     this.outputWriters = new ArrayList<>();
   }
 
-  private final Task task;
-  private List<TaskDataHandler> children;
-  private final List<OutputCollectorImpl> inputFromThisStage;
-  private final List<InputReader> sideInputFromOtherStages;
-  private final List<OutputCollectorImpl> sideInputFromThisStage;
-  private OutputCollectorImpl outputCollector;
-  private final List<OutputWriter> outputWriters;
-
   public Task getTask() {
     return task;
   }
@@ -57,88 +52,61 @@ public final class TaskDataHandler {
     return children;
   }
 
-  /**
-   * Get intra-TaskGroup input from parent tasks.
-   * We keep parent tasks' OutputCollectors, as they're the place where parent task output
-   * becomes available element-wise.
-   * @return OutputCollectors of all parent tasks.
-   */
-  public List<OutputCollectorImpl> getInputFromThisStage() {
-    return inputFromThisStage;
+  public Map<String, Iterator> getIdToSrcItrMap() {
+    return idToSrcItrMap;
+  }
+
+  public List<String> getInterStageItrId() {
+    return interStageItrId;
   }
 
-  /**
-   * Get side input from other TaskGroup.
-   * @return InputReader that has side input.
-   */
-  public List<InputReader> getSideInputFromOtherStages() {
-    return sideInputFromOtherStages;
+  public List<InputReader> getInterStageSideInput() {
+    return interStageSideInput;
   }
 
-  /**
-   * Get intra-TaskGroup side input from parent tasks.
-   * Just like normal intra-TaskGroup inputs, intra-TaskGroup side inputs are
-   * collected in parent tasks' OutputCollectors.
-   * @return OutputCollectors of all parent tasks which are marked as having side input.
-   */
-  public List<OutputCollectorImpl> getSideInputFromThisStage() {
-    return sideInputFromThisStage;
+  public List<OutputCollectorImpl> getIntraStageSideInput() {
+    return intraStageSideInput;
   }
 
-  /**
-   * Get OutputCollector of this task.
-   * @return OutputCollector of this task.
-   */
   public OutputCollectorImpl getOutputCollector() {
     return outputCollector;
   }
 
-  /**
-   * Get OutputWriters of this task.
-   * @return OutputWriters of this task.
-   */
   public List<OutputWriter> getOutputWriters() {
     return outputWriters;
   }
 
-  /**
-   * Set a DAG of children tasks' DataHandlers.
-   */
   public void setChildrenDataHandler(final List<TaskDataHandler> childrenDataHandler)
{
     children = childrenDataHandler;
   }
 
-  /**
-   * Add OutputCollector of a parent task.
-   */
-  public void addInputFromThisStages(final OutputCollectorImpl input) {
-    inputFromThisStage.add(input);
+  public void addIdToSrcItr(final String srcItrId, final Iterator srcItr) {
+    idToSrcItrMap.putIfAbsent(srcItrId, srcItr);
+  }
+
+  public void addInterStageItrId(final String itrId) {
+    interStageItrId.add(itrId);
+  }
+
+  // Add OutputCollectors of parent tasks.
+  public void addIntraStageInput(final OutputCollectorImpl input) {
+    intraStageInput.add(input);
   }
 
-  /**
-   * Add InputReader that contains side input from other TaskGroup.
-   */
-  public void addSideInputFromOtherStages(final InputReader sideInputReader) {
-    sideInputFromOtherStages.add(sideInputReader);
+  public void addInterStageSideInput(final InputReader sideInputReader) {
+    interStageSideInput.add(sideInputReader);
   }
 
-  /**
-   * Add parent OutputCollector that contains side input from the parent task.
-   */
-  public void addSideInputFromThisStage(final OutputCollectorImpl ocAsSideInput) {
-    sideInputFromThisStage.add(ocAsSideInput);
+  public void addIntraStageSideInput(final OutputCollectorImpl ocAsSideInput) {
+    intraStageSideInput.add(ocAsSideInput);
   }
 
-  /**
-   * Set OutputCollector of this task.
-   */
+  // Set OutputCollector of this task.
+  // Mark if the data from this OutputCollector is used as side input.
   public void setOutputCollector(final OutputCollectorImpl oc) {
     outputCollector = oc;
   }
 
-  /**
-   * Add OutputWriter of this task.
-   */
   public void addOutputWriters(final OutputWriter outputWriter) {
     outputWriters.add(outputWriter);
   }

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message