nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] 01/01: merge master
Date Wed, 09 May 2018 07:50:11 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch 717-TGE
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit c4d29ec3d056ed8262be3c477da70a5c78442eb8
Merge: 13510f4 033d814
Author: Jeongyoon Eo <jeongyoon0807@gmail.com>
AuthorDate: Wed May 9 16:49:56 2018 +0900

    merge master

 .github/CODE_OF_CONDUCT.md                         |  73 +++++++
 .github/CONTRIBUTING.md                            |  44 +++++
 .github/pull_request_template.md                   |  15 ++
 .travis.yml                                        |   8 +-
 README.md                                          |   3 +
 bin/sonar_qube.sh                                  |  20 ++
 .../main/java/edu/snu/nemo/client/JobLauncher.java |   1 -
 .../frontend/beam/NemoPipelineVisitor.java         |   3 +-
 .../spark/core/java/SparkFrontendUtils.java        |  14 +-
 .../compiler/frontend/spark/sql/SparkSession.java  |   9 +-
 conf/src/main/java/edu/snu/nemo/conf/JobConf.java  |   9 -
 .../nemo/examples/beam/AlternatingLeastSquare.java |   2 +-
 examples/resources/sample_executor_resources.json  |   6 +-
 .../examples/spark/sql/JavaSparkSQLExample.java    |   4 +-
 pom.xml                                            |   9 +
 .../common/message/ncs/NcsMessageContext.java      |   6 +-
 .../main/java/edu/snu/nemo/driver/NemoDriver.java  |   6 +-
 .../edu/snu/nemo/runtime/executor/Executor.java    |   3 +-
 .../nemo/runtime/executor/MetricManagerWorker.java |   2 +-
 .../nemo/runtime/executor/TaskGroupExecutor.java   |  11 +-
 .../executor/bytetransfer/ByteInputContext.java    |  14 +-
 .../executor/bytetransfer/ByteTransport.java       |   1 +
 .../runtime/executor/data/BlockManagerWorker.java  |  14 +-
 .../nemo/runtime/master/BlockManagerMaster.java    |   2 +-
 .../snu/nemo/runtime/master/JobStateManager.java   |  12 +-
 .../master/scheduler/BatchSingleJobScheduler.java  |  23 ++-
 ...pQueue.java => PendingTaskGroupCollection.java} |  29 ++-
 .../scheduler/RoundRobinSchedulingPolicy.java      | 155 ++++-----------
 .../nemo/runtime/master/scheduler/Scheduler.java   |   3 +-
 .../runtime/master/scheduler/SchedulerRunner.java  | 120 ++++++++++--
 .../runtime/master/scheduler/SchedulingPolicy.java |   6 -
 .../scheduler/SingleJobTaskGroupCollection.java    | 216 +++++++++++++++++++++
 .../master/scheduler/SingleJobTaskGroupQueue.java  | 212 --------------------
 .../SourceLocationAwareSchedulingPolicy.java       | 151 ++++----------
 .../snu/nemo/tests/runtime/RuntimeTestUtil.java    |   9 +-
 .../executor/datatransfer/DataTransferTest.java    |   7 +-
 .../scheduler/BatchSingleJobSchedulerTest.java     |  12 +-
 .../master/scheduler/FaultToleranceTest.java       |  39 ++--
 .../scheduler/RoundRobinSchedulingPolicyTest.java  |  88 +--------
 .../master/scheduler/SingleTaskGroupQueueTest.java | 152 ++++-----------
 .../SourceLocationAwareSchedulingPolicyTest.java   |  73 +------
 41 files changed, 742 insertions(+), 844 deletions(-)

diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index a7e03b0,2c49300..2e974f7
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@@ -317,374 -253,202 +317,377 @@@ public final class TaskGroupExecutor 
    }
  
    /**
 -   * Processes an OperatorTask.
 -   * @param operatorTask to execute
 +   * Get input iterator from BoundedSource and bind it with id.
     */
 -  private void launchOperatorTask(final OperatorTask operatorTask) {
 -    final Map<Transform, Object> sideInputMap = new HashMap<>();
 -    final List<DataUtil.IteratorWithNumBytes> sideInputIterators = new ArrayList<>();
 -    final String physicalTaskId = getPhysicalTaskId(operatorTask.getId());
 +  private void prepareInputFromSource() {
 +    taskGroupDag.topologicalDo(task -> {
 +      if (task instanceof BoundedSourceTask) {
 +        try {
 +          final String iteratorId = generateIteratorId();
 +          final Iterator iterator = ((BoundedSourceTask) task).getReadable().read().iterator();
 +          idToSrcIteratorMap.putIfAbsent(iteratorId, iterator);
 +          srcIteratorIdToDataHandlersMap.putIfAbsent(iteratorId, new ArrayList<>());
 +          srcIteratorIdToDataHandlersMap.get(iteratorId).add(getTaskDataHandler(task));
 +        } catch (final BlockFetchException ex) {
 +          taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
 +              Optional.empty(), Optional.of(TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
 +          LOG.info("{} Execution Failed (Recoverable: input read failure)! Exception: {}",
 +              taskGroupId, ex.toString());
 +        } catch (final Exception e) {
 +          taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
 +              Optional.empty(), Optional.empty());
 +          LOG.info("{} Execution Failed! Exception: {}", taskGroupId, e.toString());
 +          throw new RuntimeException(e);
 +        }
 +      }
 +      // TODO #XXX: Support other types of source tasks, i. e. InitializedSourceTask
 +    });
 +  }
  
 -    final Map<String, Object> metric = new HashMap<>();
 -    metricCollector.beginMeasurement(physicalTaskId, metric);
 -    long accumulatedBlockedReadTime = 0;
 -    long accumulatedWriteTime = 0;
 -    long accumulatedSerializedBlockSize = 0;
 -    long accumulatedEncodedBlockSize = 0;
 -    boolean blockSizeAvailable = true;
 -
 -    final long readStartTime = System.currentTimeMillis();
 -    // Check for side inputs
 -    physicalTaskIdToInputReaderMap.get(physicalTaskId).stream().filter(InputReader::isSideInputReader)
 -        .forEach(inputReader -> {
 +  /**
 +   * Get input iterator from other stages received in the form of CompletableFuture
 +   * and bind it with id.
 +   */
 +  private void prepareInputFromOtherStages() {
 +    inputReaderToDataHandlersMap.forEach((inputReader, dataHandlers) -> {
 +      final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures =
inputReader.read();
 +      numPartitions += futures.size();
 +
 +      // Add consumers which will push iterator when the futures are complete.
 +      futures.forEach(compFuture -> compFuture.whenComplete((iterator, exception) ->
{
 +        if (exception != null) {
 +          throw new BlockFetchException(exception);
 +        }
 +
 +        final String iteratorId = generateIteratorId();
 +        if (iteratorIdToDataHandlersMap.containsKey(iteratorId)) {
 +          throw new RuntimeException("iteratorIdToDataHandlersMap already contains " + iteratorId);
 +        } else {
 +          iteratorIdToDataHandlersMap.computeIfAbsent(iteratorId, absentIteratorId ->
dataHandlers);
            try {
 -            if (!inputReader.isSideInputReader()) {
 -              // Trying to get sideInput from a reader that does not handle sideInput.
 -              // This is probably a bug. We're not trying to recover but ensure a hard fail.
 -              throw new RuntimeException("Trying to get sideInput from non-sideInput reader");
 -            }
 -            final DataUtil.IteratorWithNumBytes sideInputIterator = inputReader.read().get(0).get();
 -            final Object sideInput = getSideInput(sideInputIterator);
 -
 -            final RuntimeEdge inEdge = inputReader.getRuntimeEdge();
 -            final Transform srcTransform;
 -            if (inEdge instanceof PhysicalStageEdge) {
 -              srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex())
 -                  .getTransform();
 -            } else {
 -              srcTransform = ((OperatorTask) inEdge.getSrc()).getTransform();
 -            }
 -            sideInputMap.put(srcTransform, sideInput);
 -            sideInputIterators.add(sideInputIterator);
 +            partitionQueue.put(Pair.of(iteratorId, iterator));
-           } catch (InterruptedException e) {
-             throw new RuntimeException("Interrupted while receiving iterator " + e);
+           } catch (final InterruptedException e) {
+             Thread.currentThread().interrupt();
+             throw new BlockFetchException(e);
 -          } catch (final ExecutionException e) {
 -            throw new BlockFetchException(e);
            }
 -        });
 +        }
 +      }));
 +    });
 +  }
  
 -    for (final DataUtil.IteratorWithNumBytes iterator : sideInputIterators) {
 -      try {
 -        accumulatedSerializedBlockSize += iterator.getNumSerializedBytes();
 -        accumulatedEncodedBlockSize += iterator.getNumEncodedBytes();
 -      } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
 -        blockSizeAvailable = false;
 -        break;
 -      }
 -    }
 +  /**
 +   * Check whether all tasks in this TaskGroup are finished.
 +   *
 +   * @return true if all tasks are finished.
 +   */
 +  private boolean finishedAllTasks() {
 +    // Total number of Tasks in this TaskGroup
 +    int taskNum = taskDataHandlers.size();
 +    int finishedTaskNum = finishedTaskIds.size();
  
 -    final Transform.Context transformContext = new ContextImpl(sideInputMap);
 -    final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
 +    return finishedTaskNum == taskNum;
 +  }
  
 -    final Transform transform = operatorTask.getTransform();
 -    transform.prepare(transformContext, outputCollector);
 -
 -    // Check for non-side inputs
 -    // This blocking queue contains the pairs having data and source vertex ids.
 -    final BlockingQueue<Pair<DataUtil.IteratorWithNumBytes, String>> dataQueue
= new LinkedBlockingQueue<>();
 -    final AtomicInteger sourceParallelism = new AtomicInteger(0);
 -    physicalTaskIdToInputReaderMap.get(physicalTaskId).stream().filter(inputReader ->
!inputReader.isSideInputReader())
 -        .forEach(inputReader -> {
 -          final List<CompletableFuture<DataUtil.IteratorWithNumBytes>> futures
= inputReader.read();
 -          final String srcIrVtxId = inputReader.getSrcIrVertexId();
 -          sourceParallelism.getAndAdd(inputReader.getSourceParallelism());
 -          // Add consumers which will push the data to the data queue when it ready to the
futures.
 -          futures.forEach(compFuture -> compFuture.whenComplete((data, exception) ->
{
 -            if (exception != null) {
 -              throw new BlockFetchException(exception);
 -            }
 -            dataQueue.add(Pair.of(data, srcIrVtxId));
 -          }));
 -        });
 -    final long readFutureEndTime = System.currentTimeMillis();
 -    // Consumes all of the partitions from incoming edges.
 -    for (int srcTaskNum = 0; srcTaskNum < sourceParallelism.get(); srcTaskNum++) {
 +  /**
 +   * Initialize the very first map of OutputCollector-children task DAG.
 +   * In each map entry, the OutputCollector contains input data to be propagated through
 +   * the children task DAG.
 +   */
 +  private void initializeOutputToChildrenDataHandlersMap() {
 +    srcIteratorIdToDataHandlersMap.values().forEach(dataHandlers ->
 +        dataHandlers.forEach(dataHandler -> {
 +          outputToChildrenDataHandlersMap.putIfAbsent(dataHandler.getOutputCollector(),
dataHandler.getChildren());
 +        }));
 +    iteratorIdToDataHandlersMap.values().forEach(dataHandlers ->
 +        dataHandlers.forEach(dataHandler -> {
 +          outputToChildrenDataHandlersMap.putIfAbsent(dataHandler.getOutputCollector(),
dataHandler.getChildren());
 +        }));
 +  }
 +
 +  /**
 +   * Update the map of OutputCollector-children task DAG.
 +   */
 +  private void updateOutputToChildrenDataHandlersMap() {
 +    Map<OutputCollectorImpl, List<TaskDataHandler>> currentMap = outputToChildrenDataHandlersMap;
 +    Map<OutputCollectorImpl, List<TaskDataHandler>> updatedMap = new HashMap<>();
 +
 +    currentMap.values().forEach(dataHandlers ->
 +        dataHandlers.forEach(dataHandler -> {
 +          updatedMap.putIfAbsent(dataHandler.getOutputCollector(), dataHandler.getChildren());
 +        })
 +    );
 +
 +    outputToChildrenDataHandlersMap = updatedMap;
 +  }
 +
 +  /**
 +   * Update the map of OutputCollector-children task DAG.
 +   *
 +   * @param task the Task with the transform to close.
 +   */
 +  private void closeTransform(final Task task) {
 +    if (task instanceof OperatorTask) {
 +      Transform transform = ((OperatorTask) task).getTransform();
 +      transform.close();
 +    }
 +  }
 +
 +  /**
 +   * As a preprocessing of side input data, get inter stage side input
 +   * and form a map of source transform-side input.
 +   *
 +   * @param task the task which receives side input from other stages.
 +   * @param sideInputMap the map of source transform-side input to build.
 +   */
 +  private void sideInputFromOtherStages(final Task task, final Map<Transform, Object>
sideInputMap) {
 +    getTaskDataHandler(task).getSideInputFromOtherStages().forEach(sideInputReader ->
{
        try {
 -        // Because the data queue is a blocking queue, we may need to wait some available
data to be pushed.
 -        final long blockedReadStartTime = System.currentTimeMillis();
 -        final Pair<DataUtil.IteratorWithNumBytes, String> availableData = dataQueue.take();
 -        final long blockedReadEndTime = System.currentTimeMillis();
 -        accumulatedBlockedReadTime += blockedReadEndTime - blockedReadStartTime;
 -        transform.onData(availableData.left(), availableData.right());
 -        if (blockSizeAvailable) {
 -          try {
 -            accumulatedSerializedBlockSize += availableData.left().getNumSerializedBytes();
 -            accumulatedEncodedBlockSize += availableData.left().getNumEncodedBytes();
 -          } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e)
{
 -            blockSizeAvailable = false;
 -          }
 +        final DataUtil.IteratorWithNumBytes sideInputIterator = sideInputReader.read().get(0).get();
 +        final Object sideInput = getSideInput(sideInputIterator);
 +        final RuntimeEdge inEdge = sideInputReader.getRuntimeEdge();
 +        final Transform srcTransform;
 +        if (inEdge instanceof PhysicalStageEdge) {
 +          srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
 +        } else {
 +          srcTransform = ((OperatorTask) inEdge.getSrc()).getTransform();
 +        }
 +        sideInputMap.put(srcTransform, sideInput);
 +
 +        // Collect metrics on block size if possible.
 +        try {
 +          serBlockSize += sideInputIterator.getNumSerializedBytes();
 +        } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
 +          serBlockSize = -1;
 +        }
 +        try {
 +          encodedBlockSize += sideInputIterator.getNumEncodedBytes();
 +        } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
 +          encodedBlockSize = -1;
          }
-       } catch (final InterruptedException | ExecutionException e) {
+       } catch (final InterruptedException e) {
+         Thread.currentThread().interrupt();
          throw new BlockFetchException(e);
++      } catch (final ExecutionException e1) {
++        throw new RuntimeException("Failed while reading side input from other stages "
+ e1);
        }
 +    });
 +  }
 +
 +  /**
 +   * As a preprocessing of side input data, get intra stage side input
 +   * and form a map of source transform-side input.
 +   * Assumption:  intra stage side input denotes a data element initially received
 +   *              via side input reader from other stages.
 +   *
 +   * @param task the task which receives the data element marked as side input.
 +   * @param sideInputMap the map of source transform-side input to build.
 +   */
 +  private void sideInputFromThisStage(final Task task, final Map<Transform, Object>
sideInputMap) {
 +    getTaskDataHandler(task).getSideInputFromThisStage().forEach(input -> {
 +      // because sideInput is only 1 element in the outputCollector
 +      Object sideInput = input.remove();
 +      final RuntimeEdge inEdge = input.getSideInputRuntimeEdge();
 +      final Transform srcTransform;
 +      if (inEdge instanceof PhysicalStageEdge) {
 +        srcTransform = ((OperatorVertex) ((PhysicalStageEdge) inEdge).getSrcVertex()).getTransform();
 +      } else {
 +        srcTransform = ((OperatorTask) inEdge.getSrc()).getTransform();
 +      }
 +      sideInputMap.put(srcTransform, sideInput);
 +    });
 +  }
  
 -      // Check whether there is any output data from the transform and write the output
of this task to the writer.
 -      final List output = outputCollector.collectOutputList();
 -      if (!output.isEmpty() && physicalTaskIdToOutputWriterMap.containsKey(physicalTaskId))
{
 -        final long writeStartTime = System.currentTimeMillis();
 -        physicalTaskIdToOutputWriterMap.get(physicalTaskId).forEach(outputWriter -> outputWriter.write(output));
 -        final long writeEndTime = System.currentTimeMillis();
 -        accumulatedWriteTime += writeEndTime - writeStartTime;
 -      } // If else, this is a sink task.
 +  /**
 +   * Executes the task group.
 +   */
 +  public void execute() {
 +    final Map<String, Object> metric = new HashMap<>();
 +    metricCollector.beginMeasurement(taskGroupId, metric);
 +    long boundedSrcReadStartTime = 0;
 +    long boundedSrcReadEndTime = 0;
 +    long inputReadStartTime = 0;
 +    long inputReadEndTime = 0;
 +    if (isExecutionRequested) {
 +      throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution called again!");
 +    } else {
 +      isExecutionRequested = true;
      }
 -    transform.close();
 +    taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.EXECUTING, Optional.empty(),
Optional.empty());
 +    LOG.info("{} Executing!", taskGroupId);
 +
 +    // Prepare input data from bounded source.
 +    boundedSrcReadStartTime = System.currentTimeMillis();
 +    prepareInputFromSource();
 +    boundedSrcReadEndTime = System.currentTimeMillis();
 +    metric.put("BoundedSourceReadTime(ms)", boundedSrcReadEndTime - boundedSrcReadStartTime);
 +
 +    // Prepare input data from other stages.
 +    inputReadStartTime = System.currentTimeMillis();
 +    prepareInputFromOtherStages();
 +
 +    // Execute the TaskGroup DAG.
 +    try {
 +      srcIteratorIdToDataHandlersMap.forEach((srcIteratorId, dataHandlers) -> {
 +        Iterator iterator = idToSrcIteratorMap.get(srcIteratorId);
 +        iterator.forEachRemaining(element -> {
 +          for (final TaskDataHandler dataHandler : dataHandlers) {
 +            runTask(dataHandler, element);
 +          }
 +        });
 +      });
  
 -    metric.put("InputReadTime(ms)", readFutureEndTime - readStartTime + accumulatedBlockedReadTime);
 -    final long transformEndTime = System.currentTimeMillis();
 -    metric.put("TransformTime(ms)",
 -        transformEndTime - readFutureEndTime - accumulatedWriteTime - accumulatedBlockedReadTime);
 +      // Process data from other stages.
 +      for (int currPartition = 0; currPartition < numPartitions; currPartition++) {
 +        Pair<String, DataUtil.IteratorWithNumBytes> idToIteratorPair = partitionQueue.take();
 +        final String iteratorId = idToIteratorPair.left();
 +        final DataUtil.IteratorWithNumBytes iterator = idToIteratorPair.right();
 +        List<TaskDataHandler> dataHandlers = iteratorIdToDataHandlersMap.get(iteratorId);
 +        iterator.forEachRemaining(element -> {
 +          for (final TaskDataHandler dataHandler : dataHandlers) {
 +            runTask(dataHandler, element);
 +          }
 +        });
  
 -    // Check whether there is any output data from the transform and write the output of
this task to the writer.
 -    final List<Long> writtenBytesList = new ArrayList<>();
 -    final List output = outputCollector.collectOutputList();
 -    if (physicalTaskIdToOutputWriterMap.containsKey(physicalTaskId)) {
 -      for (final OutputWriter outputWriter : physicalTaskIdToOutputWriterMap.get(physicalTaskId))
{
 -        if (!output.isEmpty()) {
 -          outputWriter.write(output);
 +        // Collect metrics on block size if possible.
 +        try {
 +          serBlockSize += iterator.getNumSerializedBytes();
 +        } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
 +          serBlockSize = -1;
 +        } catch (final IllegalStateException e) {
 +          LOG.error("Failed to get the number of bytes of serialized data - the data is
not ready yet ", e);
 +        }
 +        try {
 +          encodedBlockSize += iterator.getNumEncodedBytes();
 +        } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
 +          encodedBlockSize = -1;
 +        } catch (final IllegalStateException e) {
 +          LOG.error("Failed to get the number of bytes of encoded data - the data is not
ready yet ", e);
          }
 -        outputWriter.close();
 -        final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
 -        writtenBytes.ifPresent(writtenBytesList::add);
        }
 -    } else {
 -      LOG.info("This is a sink task: {}", physicalTaskId);
 +      inputReadEndTime = System.currentTimeMillis();
 +      metric.put("InputReadTime(ms)", inputReadEndTime - inputReadStartTime);
 +
 +      // Process intra-TaskGroup data.
 +      // Intra-TaskGroup data comes from outputCollectors of this TaskGroup's Tasks.
 +      initializeOutputToChildrenDataHandlersMap();
 +      while (!finishedAllTasks()) {
 +        outputToChildrenDataHandlersMap.forEach((outputCollector, childrenDataHandlers)
-> {
 +          // Get the task that has this outputCollector as its output outputCollector
 +          Task outputCollectorOwnerTask = taskDataHandlers.stream()
 +              .filter(dataHandler -> dataHandler.getOutputCollector() == outputCollector)
 +              .findFirst().get().getTask();
 +
 +          // Before consuming the output of outputCollectorOwnerTask as input,
 +          // close transform if it is OperatorTransform.
 +          closeTransform(outputCollectorOwnerTask);
 +
 +          // Set outputCollectorOwnerTask as finished.
 +          finishedTaskIds.add(getPhysicalTaskId(outputCollectorOwnerTask.getId()));
 +
 +          while (!outputCollector.isEmpty()) {
 +            final Object element = outputCollector.remove();
 +
 +            // Pass outputCollectorOwnerTask's output to its children tasks recursively.
 +            if (!childrenDataHandlers.isEmpty()) {
 +              for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
 +                runTask(childDataHandler, element);
 +              }
 +            }
 +
 +            // Write element-wise to OutputWriters if any and close the OutputWriters.
 +            if (hasOutputWriter(outputCollectorOwnerTask)) {
 +              // If outputCollector isn't empty(if closeTransform produced some output),
 +              // write them element-wise to OutputWriters.
 +              List<OutputWriter> outputWritersOfTask =
 +                  getTaskDataHandler(outputCollectorOwnerTask).getOutputWriters();
 +              outputWritersOfTask.forEach(outputWriter -> outputWriter.writeElement(element));
 +            }
 +          }
 +
 +          if (hasOutputWriter(outputCollectorOwnerTask)) {
 +            writeAndCloseOutputWriters(outputCollectorOwnerTask);
 +          }
 +        });
 +        updateOutputToChildrenDataHandlersMap();
 +      }
 +    } catch (final BlockWriteException ex2) {
 +      taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
 +          Optional.empty(), Optional.of(TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
 +      LOG.info("{} Execution Failed (Recoverable: output write failure)! Exception: {}",
 +          taskGroupId, ex2.toString());
 +    } catch (final Exception e) {
 +      taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
 +          Optional.empty(), Optional.empty());
 +      LOG.info("{} Execution Failed! Exception: {}",
 +          taskGroupId, e.toString());
 +      throw new RuntimeException(e);
      }
 -    final long writeEndTime = System.currentTimeMillis();
 -    metric.put("OutputTime(ms)", writeEndTime - transformEndTime + accumulatedWriteTime);
 -    putReadBytesMetric(blockSizeAvailable, accumulatedSerializedBlockSize, accumulatedEncodedBlockSize,
metric);
 -    putWrittenBytesMetric(writtenBytesList, metric);
  
 -    metricCollector.endMeasurement(physicalTaskId, metric);
 +    // Put TaskGroup-unit metrics.
 +    final boolean available = serBlockSize >= 0;
 +    putReadBytesMetric(available, serBlockSize, encodedBlockSize, metric);
 +    metricCollector.endMeasurement(taskGroupId, metric);
 +    if (logicalTaskIdPutOnHold == null) {
 +      taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.COMPLETE, Optional.empty(),
Optional.empty());
 +    } else {
 +      taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.ON_HOLD,
 +          Optional.of(logicalTaskIdPutOnHold),
 +          Optional.empty());
 +    }
 +    LOG.info("{} Complete!", taskGroupId);
    }
  
    /**
 -   * Pass on the data to the following tasks.
 -   * @param task the task to carry on the data.
 +   * Recursively executes a task with the input data element.
 +   *
 +   * @param dataHandler TaskDataHandler of a task to execute.
 +   * @param dataElement input data element to process.
     */
 -  private void launchMetricCollectionBarrierTask(final MetricCollectionBarrierTask task)
{
 -    final String physicalTaskId = getPhysicalTaskId(task.getId());
 -    final Map<String, Object> metric = new HashMap<>();
 -    metricCollector.beginMeasurement(physicalTaskId, metric);
 -    long accumulatedSerializedBlockSize = 0;
 -    long accumulatedEncodedBlockSize = 0;
 -    boolean blockSizeAvailable = true;
 -
 -    final long readStartTime = System.currentTimeMillis();
 -    final BlockingQueue<DataUtil.IteratorWithNumBytes> dataQueue = new LinkedBlockingQueue<>();
 -    final AtomicInteger sourceParallelism = new AtomicInteger(0);
 -    physicalTaskIdToInputReaderMap.get(physicalTaskId).stream().filter(inputReader ->
!inputReader.isSideInputReader())
 -        .forEach(inputReader -> {
 -          sourceParallelism.getAndAdd(inputReader.getSourceParallelism());
 -          inputReader.read().forEach(compFuture -> compFuture.thenAccept(dataQueue::add));
 -        });
 +  private void runTask(final TaskDataHandler dataHandler, final Object dataElement) {
 +    final Task task = dataHandler.getTask();
 +    final OutputCollectorImpl outputCollector = dataHandler.getOutputCollector();
-     final String physicalTaskId = getPhysicalTaskId(task.getId());
 +
 +    // Process element-wise depending on the Task type
 +    if (task instanceof BoundedSourceTask) {
 +      if (dataElement == null) { // null used for Beam VoidCoders
 +        final List<Object> nullForVoidCoder = Collections.singletonList(dataElement);
 +        outputCollector.emit(nullForVoidCoder);
 +      } else {
 +        outputCollector.emit(dataElement);
 +      }
 +    } else if (task instanceof OperatorTask) {
 +      final Transform transform = ((OperatorTask) task).getTransform();
 +      transform.onData(dataElement);
 +    } else if (task instanceof MetricCollectionBarrierTask) {
 +      if (dataElement == null) { // null used for Beam VoidCoders
 +        final List<Object> nullForVoidCoder = Collections.singletonList(dataElement);
 +        outputCollector.emit(nullForVoidCoder);
 +      } else {
 +        outputCollector.emit(dataElement);
 +      }
 +      setTaskPutOnHold((MetricCollectionBarrierTask) task);
 +    } else {
 +      throw new UnsupportedOperationException("This type  of Task is not supported");
 +    }
  
 -    final List data = new ArrayList<>();
 -    for (int srcTaskNum = 0; srcTaskNum < sourceParallelism.get(); srcTaskNum++) {
 -      try {
 -        final DataUtil.IteratorWithNumBytes availableData = dataQueue.take();
 -        availableData.forEachRemaining(data::add);
 -        if (blockSizeAvailable) {
 -          try {
 -            accumulatedSerializedBlockSize += availableData.getNumSerializedBytes();
 -            accumulatedEncodedBlockSize += availableData.getNumEncodedBytes();
 -          } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e)
{
 -            blockSizeAvailable = false;
 -          }
 +    // For the produced output
 +    while (!outputCollector.isEmpty()) {
 +      final Object element = outputCollector.remove();
 +
 +      // Pass output to its children recursively.
 +      List<TaskDataHandler> childrenDataHandlers = dataHandler.getChildren();
 +      if (!childrenDataHandlers.isEmpty()) {
 +        for (final TaskDataHandler childDataHandler : childrenDataHandlers) {
 +          runTask(childDataHandler, element);
          }
 -      } catch (final InterruptedException e) {
 -        Thread.currentThread().interrupt();
 -        throw new BlockFetchException(e);
        }
 -    }
 -    final long readEndTime = System.currentTimeMillis();
 -    metric.put("InputReadTime(ms)", readEndTime - readStartTime);
  
 -    final List<Long> writtenBytesList = new ArrayList<>();
 -    for (final OutputWriter outputWriter : physicalTaskIdToOutputWriterMap.get(physicalTaskId))
{
 -      outputWriter.write(data);
 -      outputWriter.close();
 -      final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
 -      writtenBytes.ifPresent(writtenBytesList::add);
 +      // Write element-wise to OutputWriters if any
 +      if (hasOutputWriter(task)) {
 +        List<OutputWriter> outputWritersOfTask = dataHandler.getOutputWriters();
 +        outputWritersOfTask.forEach(outputWriter -> outputWriter.writeElement(element));
 +      }
      }
 -    final long writeEndTime  = System.currentTimeMillis();
 -    metric.put("OutputWriteTime(ms)", writeEndTime - readEndTime);
 -    putReadBytesMetric(blockSizeAvailable, accumulatedSerializedBlockSize, accumulatedEncodedBlockSize,
metric);
 -    putWrittenBytesMetric(writtenBytesList, metric);
 -    metricCollector.endMeasurement(physicalTaskId, metric);
    }
  
    /**

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message