nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] 11/40: merge pipelining
Date Fri, 06 Apr 2018 02:35:51 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch skew_exp
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git

commit 2e555b14a7a28a5abb990748671ce5fed9f9a1de
Merge: d219bff 0f5d66e
Author: sanha <sanhaleehana@naver.com>
AuthorDate: Thu Mar 8 19:07:43 2018 +0900

    merge pipelining

 .../ir/executionproperty/ExecutionProperty.java    |   1 -
 .../common/ir/vertex/transform/RelayTransform.java |  10 +-
 .../nemo/common/ir/vertex/transform/Transform.java |   7 +-
 .../frontend/beam/NemoPipelineVisitor.java         |   3 -
 .../beam/transform/CreateViewTransform.java        |  13 +-
 .../frontend/beam/transform/DoTransform.java       |  36 +-
 .../frontend/beam/transform/FlattenTransform.java  |   9 +-
 .../beam/transform/GroupByKeyTransform.java        |  27 +-
 .../frontend/beam/transform/WindowTransform.java   |  10 +-
 .../frontend/spark/transform/CollectTransform.java |  27 +-
 .../frontend/spark/transform/FlatMapTransform.java |  22 +-
 .../spark/transform/GroupByKeyTransform.java       |  19 +-
 .../spark/transform/MapToPairTransform.java        |  23 +-
 .../frontend/spark/transform/MapTransform.java     |  15 +-
 .../spark/transform/ReduceByKeyTransform.java      |  35 +-
 .../frontend/spark/transform/ReduceTransform.java  |  45 +-
 .../optimizer/examples/EmptyComponents.java        |   3 +-
 .../nemo/examples/beam/AlternatingLeastSquare.java |   4 +-
 .../snu/nemo/examples/beam/GenericSourceSink.java  |   2 -
 .../beam/MultinomialLogisticRegression.java        |   8 +-
 .../beam/AlternatingLeastSquareITCase.java         |   4 +-
 .../snu/nemo/examples/beam/BroadcastITCase.java    |   4 +-
 .../snu/nemo/examples/beam/MapReduceITCase.java    |   8 +-
 .../beam/MultinomialLogisticRegressionITCase.java  |   1 +
 ...ive.java => DataSkewPolicyParallelismFive.java} |   4 +-
 ...va => DisaggregationPolicyParallelismFive.java} |   5 +-
 ...simFive.java => PadoPolicyParallelismFive.java} |   4 +-
 ...ive.java => SailfishPolicyParallelismFive.java} |   4 +-
 .../nemo/runtime/executor/TaskGroupExecutor.java   | 778 +++++++++++++--------
 .../runtime/executor/TaskGroupStateManager.java    | 152 +---
 .../snu/nemo/runtime/executor/data/DataUtil.java   |  12 +-
 .../executor/data/stores/LocalBlockStore.java      |   1 +
 .../executor/datatransfer/DataTransferFactory.java |  26 -
 .../runtime/executor/datatransfer/InputReader.java |   3 +
 .../executor/datatransfer/OutputCollectorImpl.java |  63 +-
 .../executor/datatransfer/OutputWriter.java        |  22 +-
 .../executor/datatransfer/TaskDataHandler.java     |  94 +++
 .../nemo/runtime/master/BlockManagerMaster.java    |   2 +-
 .../edu/snu/nemo/runtime/master/BlockMetadata.java |   2 +-
 tests/pom.xml                                      |   2 +-
 .../runtime/executor/TaskGroupExecutorTest.java    |  57 +-
 .../executor/datatransfer/DataTransferTest.java    |   7 +-
 .../runtime/master/BlockManagerMasterTest.java     |   1 -
 43 files changed, 901 insertions(+), 674 deletions(-)

diff --cc examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
index 3f2cb0e,2d96b6e..c4a8d5a
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/AlternatingLeastSquareITCase.java
@@@ -76,9 -77,7 +76,9 @@@ public final class AlternatingLeastSqua
    public void testPado() throws Exception {
      JobLauncher.main(builder
          .addJobId(AlternatingLeastSquareITCase.class.getSimpleName() + "_pado")
 +        .addUserMain(AlternatingLeastSquare.class.getCanonicalName())
 +        .addUserArgs(input, numFeatures, numIteration, lambda, output)
-         .addOptimizationPolicy(PadoPolicyParallelsimFive.class.getCanonicalName())
+         .addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
          .build());
    }
  }
diff --cc examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
index f6314e4,b918960..92045d3
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/BroadcastITCase.java
@@@ -74,9 -71,7 +74,9 @@@ public final class BroadcastITCase 
    public void testPado() throws Exception {
      JobLauncher.main(builder
          .addJobId(BroadcastITCase.class.getSimpleName() + "_pado")
 +        .addUserMain(Broadcast.class.getCanonicalName())
 +        .addUserArgs(inputFilePath, outputFilePath)
-         .addOptimizationPolicy(PadoPolicyParallelsimFive.class.getCanonicalName())
+         .addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
          .build());
    }
  }
diff --cc examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
index 83ef755,1f96af5..b113bbc
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MapReduceITCase.java
@@@ -74,9 -70,7 +74,9 @@@ public final class MapReduceITCase 
    public void testSailfish() throws Exception {
      JobLauncher.main(builder
          .addJobId(MapReduceITCase.class.getSimpleName() + "_sailfish")
 +        .addUserMain(MapReduce.class.getCanonicalName())
 +        .addUserArgs(inputFilePath, outputFilePath)
-         .addOptimizationPolicy(SailfishPolicyParallelsimFive.class.getCanonicalName())
+         .addOptimizationPolicy(SailfishPolicyParallelismFive.class.getCanonicalName())
          .build());
    }
  
@@@ -84,9 -78,7 +84,9 @@@
    public void testDisagg() throws Exception {
      JobLauncher.main(builder
          .addJobId(MapReduceITCase.class.getSimpleName() + "_disagg")
 +        .addUserMain(MapReduce.class.getCanonicalName())
 +        .addUserArgs(inputFilePath, outputFilePath)
-         .addOptimizationPolicy(DisaggregationPolicyParallelsimFive.class.getCanonicalName())
+         .addOptimizationPolicy(DisaggregationPolicyParallelismFive.class.getCanonicalName())
          .build());
    }
  
@@@ -94,9 -86,7 +94,9 @@@
    public void testPado() throws Exception {
      JobLauncher.main(builder
          .addJobId(MapReduceITCase.class.getSimpleName() + "_pado")
 +        .addUserMain(MapReduce.class.getCanonicalName())
 +        .addUserArgs(inputFilePath, outputFilePath)
-         .addOptimizationPolicy(PadoPolicyParallelsimFive.class.getCanonicalName())
+         .addOptimizationPolicy(PadoPolicyParallelismFive.class.getCanonicalName())
          .build());
    }
  
@@@ -108,9 -98,7 +108,9 @@@
    public void testDataSkew() throws Exception {
      JobLauncher.main(builder
          .addJobId(MapReduceITCase.class.getSimpleName() + "_dataskew")
 +        .addUserMain(MapReduce.class.getCanonicalName())
 +        .addUserArgs(inputFilePath, outputFilePath)
-         .addOptimizationPolicy(DataSkewPolicyParallelsimFive.class.getCanonicalName())
+         .addOptimizationPolicy(DataSkewPolicyParallelismFive.class.getCanonicalName())
          .build());
    }
  }
diff --cc examples/beam/src/test/java/edu/snu/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
index c37cbbc,a51a751..59313e0
--- a/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
+++ b/examples/beam/src/test/java/edu/snu/nemo/examples/beam/MultinomialLogisticRegressionITCase.java
@@@ -15,9 -15,10 +15,10 @@@
   */
  package edu.snu.nemo.examples.beam;
  
 -import edu.snu.nemo.examples.beam.policy.PadoPolicyParallelismFive;
  import edu.snu.nemo.client.JobLauncher;
  import edu.snu.nemo.common.test.ArgBuilder;
 +import edu.snu.nemo.compiler.optimizer.policy.DefaultPolicy;
+ 
  import org.junit.Before;
  import org.junit.Test;
  import org.junit.runner.RunWith;
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
index 89284e2,bc8ac79..5bb5074
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
@@@ -170,76 -221,50 +221,49 @@@ public final class TaskGroupExecutor 
    }
  
    /**
-    * Executes the task group.
+    * Add output outputCollectors to each {@link Task}.
+    * Output outputCollector denotes the one and only one outputCollector of this task.
+    * Check the outgoing edges that will use this outputCollector,
+    * and set this outputCollector as side input if any one of the edges uses this outputCollector
as side input.
+    *
+    * @param task the Task to add output outputCollectors to.
     */
-   public void execute() {
-     LOG.info("{} Execution Started!", taskGroupId);
-     if (isExecutionRequested) {
-       throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution called again!");
-     } else {
-       isExecutionRequested = true;
-     }
- 
-     taskGroupStateManager.onTaskGroupStateChanged(
-         TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
+   private void setOutputCollector(final Task task) {
+     final TaskDataHandler dataHandler = taskToDataHandlerMap.get(task);
+     final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
+     final String physicalTaskId = getPhysicalTaskId(task.getId());
  
-     taskGroupDag.topologicalDo(task -> {
-       final String physicalTaskId = getPhysicalTaskId(task.getId());
-       taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.EXECUTING,
Optional.empty());
-       try {
-         if (task instanceof BoundedSourceTask) {
-           launchBoundedSourceTask((BoundedSourceTask) task);
-           taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE,
Optional.empty());
-           LOG.info("{} Execution Complete!", taskGroupId);
-         } else if (task instanceof OperatorTask) {
-           launchOperatorTask((OperatorTask) task);
-           taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.COMPLETE,
Optional.empty());
-           LOG.info("{} Execution Complete!", taskGroupId);
-         } else if (task instanceof MetricCollectionBarrierTask) {
-           launchMetricCollectionBarrierTask((MetricCollectionBarrierTask) task);
-           taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.ON_HOLD,
Optional.empty());
-           LOG.info("{} Execution Complete!", taskGroupId);
-         } else {
-           throw new UnsupportedOperationException(task.toString());
-         }
-       } catch (final BlockFetchException ex) {
-         taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.FAILED_RECOVERABLE,
-             Optional.of(TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
-         LOG.warn("{} Execution Failed (Recoverable)! Exception: {}",
-             new Object[] {taskGroupId, ex.toString()});
-       } catch (final BlockWriteException ex2) {
-         taskGroupStateManager.onTaskStateChanged(physicalTaskId, TaskState.State.FAILED_RECOVERABLE,
-             Optional.of(TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
-         LOG.warn("{} Execution Failed (Recoverable)! Exception: {}",
-             new Object[] {taskGroupId, ex2.toString()});
-       } catch (final Exception e) {
-         taskGroupStateManager.onTaskStateChanged(
-             physicalTaskId, TaskState.State.FAILED_UNRECOVERABLE, Optional.empty());
-         throw new RuntimeException(e);
+     taskGroupDag.getOutgoingEdgesOf(task).forEach(outEdge -> {
+       if (outEdge.isSideInput()) {
+         outputCollector.setSideInputRuntimeEdge(outEdge);
+         outputCollector.setAsSideInputFor(physicalTaskId);
+         LOG.info("log: {} {} Marked as accepting sideInput(edge {})",
+             taskGroupId, physicalTaskId, outEdge.getId());
        }
      });
+ 
+     dataHandler.setOutputCollector(outputCollector);
+     LOG.info("log: {} {} Added OutputPipe", taskGroupId, physicalTaskId);
    }
  
-   /**
-    * Processes a BoundedSourceTask.
-    *
-    * @param boundedSourceTask the bounded source task to execute
-    * @throws Exception occurred during input read.
-    */
-   private void launchBoundedSourceTask(final BoundedSourceTask boundedSourceTask) throws
Exception {
-     final String physicalTaskId = getPhysicalTaskId(boundedSourceTask.getId());
-     final Map<String, Object> metric = new HashMap<>();
-     metricCollector.beginMeasurement(physicalTaskId, metric);
+   private boolean hasOutputWriter(final Task task) {
+     return !taskToDataHandlerMap.get(task).getOutputWriters().isEmpty();
+   }
  
-     final long readStartTime = System.currentTimeMillis();
-     final Readable readable = boundedSourceTask.getReadable();
-     final Iterable readData = readable.read();
-     final long readEndTime = System.currentTimeMillis();
-     metric.put("BoundedSourceReadTime(ms)", readEndTime - readStartTime);
+   private void setTaskPutOnHold(final MetricCollectionBarrierTask task) {
+     final String physicalTaskId = getPhysicalTaskId(task.getId());
+     logicalTaskIdPutOnHold = RuntimeIdGenerator.getLogicalTaskIdIdFromPhysicalTaskId(physicalTaskId);
+   }
  
+   private void writeAndCloseOutputWriters(final Task task) {
+     final String physicalTaskId = getPhysicalTaskId(task.getId());
      final List<Long> writtenBytesList = new ArrayList<>();
-     for (final OutputWriter outputWriter : physicalTaskIdToOutputWriterMap.get(physicalTaskId))
{
-       outputWriter.write(readData);
+     final Map<String, Object> metric = new HashMap<>();
+     metricCollector.beginMeasurement(physicalTaskId, metric);
+     final long writeStartTime = System.currentTimeMillis();
+ 
+     taskToDataHandlerMap.get(task).getOutputWriters().forEach(outputWriter -> {
+       LOG.info("Write and close outputWriter of task {}", getPhysicalTaskId(task.getId()));
 -      outputWriter.write();
        outputWriter.close();
        final Optional<Long> writtenBytes = outputWriter.getWrittenBytes();
        writtenBytes.ifPresent(writtenBytesList::add);
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
index 861451a,e34dfa2..ee0aa46
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/DataUtil.java
@@@ -18,10 -18,10 +18,12 @@@ package edu.snu.nemo.runtime.executor.d
  import com.google.common.io.CountingInputStream;
  import edu.snu.nemo.common.DirectByteArrayOutputStream;
  import edu.snu.nemo.common.coder.Coder;
 +import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 +import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
  import edu.snu.nemo.runtime.executor.data.streamchainer.StreamChainer;
  import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
  
  import java.io.*;
  import java.util.*;
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalBlockStore.java
index 4b419cc,61dd116..2f1b9f1
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalBlockStore.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/stores/LocalBlockStore.java
@@@ -49,29 -50,12 +49,30 @@@ public abstract class LocalBlockStore e
    }
  
    /**
 -   * @see BlockStore#putPartitions(String, Iterable)
 +   * @see BlockStore#write(String, Serializable, Object).
     */
    @Override
 -  public final <K extends Serializable>
 -  Optional<List<Long>> putPartitions(final String blockId,
 -                                     final Iterable<NonSerializedPartition<K>>
partitions)
 +  public final <K extends Serializable> void write(final String blockId,
 +                                                   final K key,
 +                                                   final Object element) throws BlockWriteException
{
 +    try {
 +      final Block<K> block = blockMap.get(blockId);
 +      if (block == null) {
 +        throw new BlockWriteException(new Throwable("The block " + blockId + "is not created
yet."));
 +      }
 +      block.write(key, element);
 +    } catch (final IOException e) {
++      e.printStackTrace();
 +      throw new BlockWriteException(new Throwable("Failed to store partitions to this block."));
 +    }
 +  }
 +
 +  /**
 +   * @see BlockStore#writePartitions(String, Iterable)
 +   */
 +  @Override
 +  public final <K extends Serializable> void writePartitions(final String blockId,
 +                                                             final Iterable<NonSerializedPartition<K>>
partitions)
        throws BlockWriteException {
      try {
        final Block<K> block = blockMap.get(blockId);
diff --cc runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 59d55c3,6f9a594..2d255ee
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@@ -37,20 -38,12 +37,21 @@@ public final class OutputWriter extend
    private final String srcVertexId;
    @Nullable private final IRVertex dstIrVertex;
    private final DataStoreProperty.Value blockStoreValue;
 -  private final Map<PartitionerProperty.Value, Partitioner> partitionerMap;
 -  private final List<Long> accumulatedPartitionSizeInfo;
 -  private final List<Long> writtenBytes;
 +  private Optional<Long> writtenBytes;
    private final BlockManagerWorker blockManagerWorker;
 -  private final List<Object> outputList;
 +  private final Partitioner partitioner;
++  private final boolean writable;
  
 +  /**
 +   * Constructor.
 +   *
 +   * @param hashRangeMultiplier the {@link edu.snu.nemo.conf.JobConf.HashRangeMultiplier}.
 +   * @param srcTaskIdx          the index of the source task.
 +   * @param srcRuntimeVertexId  the ID of the source vertex.
 +   * @param dstIrVertex         the destination IR vertex.
 +   * @param runtimeEdge         the {@link RuntimeEdge}.
 +   * @param blockManagerWorker  the {@link BlockManagerWorker}.
 +   */
    public OutputWriter(final int hashRangeMultiplier,
                        final int srcTaskIdx,
                        final String srcRuntimeVertexId,
@@@ -65,45 -58,73 +66,40 @@@
      this.dstIrVertex = dstIrVertex;
      this.blockManagerWorker = blockManagerWorker;
      this.blockStoreValue = runtimeEdge.getProperty(ExecutionProperty.Key.DataStore);
 -    this.partitionerMap = new HashMap<>();
 -    this.outputList = new ArrayList<>();
 -    this.writtenBytes = new ArrayList<>();
 -    // TODO #511: Refactor metric aggregation for (general) run-rime optimization.
 -    this.accumulatedPartitionSizeInfo = new ArrayList<>();
 -    partitionerMap.put(PartitionerProperty.Value.IntactPartitioner, new IntactPartitioner());
 -    partitionerMap.put(PartitionerProperty.Value.HashPartitioner, new HashPartitioner());
 -    partitionerMap.put(PartitionerProperty.Value.DataSkewHashPartitioner,
 -        new DataSkewHashPartitioner(hashRangeMultiplier));
 -    blockManagerWorker.createBlock(blockId, blockStoreValue);
 -  }
 -
 -  public void writeElement(final Object element) {
 -    outputList.add(element);
 -  }
 -
 -  /**
 -   * Writes output data depending on the communication pattern of the edge.
 -   **/
 -  public void write() {
 -    // Aggregate element to form the inter-Stage data.
 -    final Boolean isDataSizeMetricCollectionEdge = MetricCollectionProperty.Value.DataSkewRuntimePass
 -        .equals(runtimeEdge.getProperty(ExecutionProperty.Key.MetricCollection));
 +    this.writtenBytes = Optional.empty();
  
 -    // Group the data into blocks.
 +    // Setup partitioner
 +    final int dstParallelism = getDstParallelism();
 +    final KeyExtractor keyExtractor = runtimeEdge.getProperty(ExecutionProperty.Key.KeyExtractor);
      final PartitionerProperty.Value partitionerPropertyValue =
          runtimeEdge.getProperty(ExecutionProperty.Key.Partitioner);
 -    final int dstParallelism = getDstParallelism();
 -
 -    final Partitioner partitioner = partitionerMap.get(partitionerPropertyValue);
 -    if (partitioner == null) {
 -      throw new UnsupportedPartitionerException(
 -          new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
 +    switch (partitionerPropertyValue) {
 +      case IntactPartitioner:
 +        this.partitioner = new IntactPartitioner();
 +        break;
 +      case HashPartitioner:
 +        this.partitioner = new HashPartitioner(dstParallelism, keyExtractor);
 +        break;
 +      case DataSkewHashPartitioner:
 +        this.partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism,
keyExtractor);
 +        break;
 +      default:
 +        throw new UnsupportedPartitionerException(
 +            new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
      }
 -
 -    final KeyExtractor keyExtractor = runtimeEdge.getProperty(ExecutionProperty.Key.KeyExtractor);
 -    final List<Partition> partitionsToWrite;
 +    blockManagerWorker.createBlock(blockId, blockStoreValue);
-   }
  
-   /**
-    * Writes output data depending on the communication pattern of the edge.
-    *
-    * @param dataToWrite An iterable for the elements to be written.
-    */
-   public void write(final Iterable dataToWrite) {
      final DuplicateEdgeGroupPropertyValue duplicateDataProperty =
          runtimeEdge.getProperty(ExecutionProperty.Key.DuplicateEdgeGroup);
-     if (duplicateDataProperty == null
 -    if (duplicateDataProperty != null
 -        && !duplicateDataProperty.getRepresentativeEdgeId().equals(runtimeEdge.getId())
 -        && duplicateDataProperty.getGroupSize() > 1) {
 -      partitionsToWrite = partitioner.partition(Collections.emptyList(), dstParallelism,
keyExtractor);
 -    } else {
 -      partitionsToWrite = partitioner.partition(outputList, dstParallelism, keyExtractor);
 -    }
 -
 -    // Write the grouped blocks into partitions.
 -    // TODO #492: Modularize the data communication pattern.
 -    final DataCommunicationPatternProperty.Value comValue =
 -        runtimeEdge.getProperty(ExecutionProperty.Key.DataCommunicationPattern);
++    writable = duplicateDataProperty == null
 +        || duplicateDataProperty.getRepresentativeEdgeId().equals(runtimeEdge.getId())
-         || duplicateDataProperty.getGroupSize() <= 1) {
-       dataToWrite.forEach(element -> {
-         blockManagerWorker.write(blockId, partitioner.partition(element), element, blockStoreValue);
-       });
-     } // If else, does not need to write because the data is duplicated.
++        || duplicateDataProperty.getGroupSize() <= 1;
++  }
+ 
 -    if (DataCommunicationPatternProperty.Value.OneToOne.equals(comValue)) {
 -      writeOneToOne(partitionsToWrite);
 -    } else if (DataCommunicationPatternProperty.Value.BroadCast.equals(comValue)) {
 -      writeBroadcast(partitionsToWrite);
 -    } else if (DataCommunicationPatternProperty.Value.Shuffle.equals(comValue)) {
 -      // If the dynamic optimization which detects data skew is enabled, sort the data and
write it.
 -      if (isDataSizeMetricCollectionEdge) {
 -        dataSkewWrite(partitionsToWrite);
 -      } else {
 -        writeShuffle(partitionsToWrite);
 -      }
 -    } else {
 -      throw new UnsupportedCommPatternException(new Exception("Communication pattern not
supported"));
++  public void writeElement(final Object element) {
++    if (writable) {
++      blockManagerWorker.write(blockId, partitioner.partition(element), element, blockStoreValue);
+     }
    }
  
    /**
diff --cc runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 4dc2e4c,0651b9b..8e7c5c4
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@@ -16,10 -16,9 +16,10 @@@
  package edu.snu.nemo.runtime.master;
  
  import edu.snu.nemo.common.exception.IllegalMessageException;
 +import edu.snu.nemo.common.exception.UnknownExecutionStateException;
+ import edu.snu.nemo.runtime.common.comm.ControlMessage;
  import edu.snu.nemo.runtime.common.exception.AbsentBlockException;
  import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
- import edu.snu.nemo.runtime.common.comm.ControlMessage;
  import edu.snu.nemo.runtime.common.message.MessageContext;
  import edu.snu.nemo.runtime.common.message.MessageEnvironment;
  import edu.snu.nemo.runtime.common.message.MessageListener;
diff --cc tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 70379ee,7834620..420e3c4
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@@ -343,7 -332,8 +342,7 @@@ public final class DataTransferTest 
        final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
        final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex,
srcVertex.getId(), dstVertex,
            dummyEdge, sender);
-       writer.write(dataWritten);
+       dataWritten.iterator().forEachRemaining(writer::writeElement);
 -      writer.write();
        writer.close();
        dataWrittenList.add(dataWritten);
      });
@@@ -437,13 -427,15 +436,13 @@@
        final List dataWritten = getRangedNumList(0, PARALLELISM_TEN);
        final OutputWriter writer = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex,
srcVertex.getId(), dstVertex,
            dummyEdge, sender);
-       writer.write(dataWritten);
+       dataWritten.iterator().forEachRemaining(writer::writeElement);
 -      writer.write();
        writer.close();
        dataWrittenList.add(dataWritten);
  
        final OutputWriter writer2 = new OutputWriter(HASH_RANGE_MULTIPLIER, srcTaskIndex,
srcVertex.getId(), dstVertex,
            dummyEdge2, sender);
-       writer2.write(dataWritten);
+       dataWritten.iterator().forEachRemaining(writer::writeElement);
 -      writer2.write();
        writer2.close();
      });
  

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message