nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sa...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-192] Remove IRDAG from PhysicalPlan (#108)
Date Sun, 19 Aug 2018 15:31:20 GMT
This is an automated email from the ASF dual-hosted git repository.

sanha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc9791  [NEMO-192] Remove IRDAG from PhysicalPlan (#108)
1fc9791 is described below

commit 1fc979162b493a72b2e1898bc189aa4c35d4b87d
Author: Jeongyoon Eo <jeongyoon.eo@spl.snu.ac.kr>
AuthorDate: Mon Aug 20 00:31:17 2018 +0900

    [NEMO-192] Remove IRDAG from PhysicalPlan (#108)
    
    JIRA: [NEMO-192: Remove IRDAG from PhysicalPlan](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-192)
    
    **Major changes:**
    - Removed IRDAG from PhysicalPlan
    
    **Minor changes to note:**
    - N/A
    
    **Tests for the changes:**
    - N/A
    
    **Other comments:**
    - Now dynamic optimization is done via StageEdge. We need a revised version of IR-based
dynamic optimization, considering issues related with RDD caching and reshaping IR DAG in
run-time. This will be handled with issue [NEMO-193](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-193).
    
    resolves [NEMO-192](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-192)
---
 .../nemo/compiler/backend/nemo/NemoBackend.java    |  2 +-
 .../eventhandler/DynamicOptimizationEvent.java     |  8 +--
 .../DynamicOptimizationEventHandler.java           |  4 +-
 .../runtime/common/optimizer/RunTimeOptimizer.java | 32 +++--------
 .../pass/runtime/DataSkewRuntimePass.java          | 37 ++++++-------
 .../common/optimizer/pass/runtime/RuntimePass.java |  6 +--
 .../snu/nemo/runtime/common/plan/PhysicalPlan.java | 11 ----
 .../snu/nemo/runtime/common/plan/StageEdge.java    | 62 ++++++++++++++++++++++
 .../runtime/executor/datatransfer/InputReader.java |  7 +--
 .../runtime/master/scheduler/BatchScheduler.java   | 23 +++-----
 .../runtime/common/plan/TestPlanGenerator.java     |  2 +-
 11 files changed, 108 insertions(+), 86 deletions(-)

diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 4789a5c..696acff 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -61,6 +61,6 @@ public final class NemoBackend implements Backend<PhysicalPlan> {
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator physicalPlanGenerator) {
     final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
-    return new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(), irDAG, stageDAG);
+    return new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(), stageDAG);
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index 9a8b26a..341f0d1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -19,8 +19,8 @@
 package edu.snu.nemo.runtime.common.eventhandler;
 
 import edu.snu.nemo.common.eventhandler.RuntimeEvent;
-import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 
 /**
  * An event for triggering dynamic optimization.
@@ -30,7 +30,7 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
   private final Object dynOptData;
   private final String taskId;
   private final String executorId;
-  private final IREdge targetEdge;
+  private final StageEdge targetEdge;
 
   /**
    * Default constructor.
@@ -42,7 +42,7 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
                                   final Object dynOptData,
                                   final String taskId,
                                   final String executorId,
-                                  final IREdge targetEdge) {
+                                  final StageEdge targetEdge) {
     this.physicalPlan = physicalPlan;
     this.taskId = taskId;
     this.dynOptData = dynOptData;
@@ -75,7 +75,7 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
     return this.dynOptData;
   }
 
-  public IREdge getTargetEdge() {
+  public StageEdge getTargetEdge() {
     return this.targetEdge;
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index a642248..9cd4ca2 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -20,9 +20,9 @@ package edu.snu.nemo.runtime.common.eventhandler;
 
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.runtime.common.optimizer.RunTimeOptimizer;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import org.apache.reef.wake.impl.PubSubEventHandler;
 
 import javax.inject.Inject;
@@ -51,7 +51,7 @@ public final class DynamicOptimizationEventHandler implements RuntimeEventHandle
   public void onNext(final DynamicOptimizationEvent dynamicOptimizationEvent) {
     final PhysicalPlan physicalPlan = dynamicOptimizationEvent.getPhysicalPlan();
     final Object dynOptData = dynamicOptimizationEvent.getDynOptData();
-    final IREdge targetEdge = dynamicOptimizationEvent.getTargetEdge();
+    final StageEdge targetEdge = dynamicOptimizationEvent.getTargetEdge();
 
     final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan, dynOptData,
targetEdge);
 
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
index d9ca024..9611326 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
@@ -16,17 +16,9 @@
 package edu.snu.nemo.runtime.common.optimizer;
 
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
-import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
 
 import java.util.*;
 
@@ -48,22 +40,12 @@ public final class RunTimeOptimizer {
   public static synchronized PhysicalPlan dynamicOptimization(
           final PhysicalPlan originalPlan,
           final Object dynOptData,
-          final IREdge targetEdge) {
-    try {
-      final PhysicalPlanGenerator physicalPlanGenerator =
-          Tang.Factory.getTang().newInjector().getInstance(PhysicalPlanGenerator.class);
-
-      // Data for dynamic optimization used in DataSkewRuntimePass
-      // is a map of <hash value, partition size>.
-      final DAG<IRVertex, IREdge> newIrDAG =
-          new DataSkewRuntimePass()
-              .apply(originalPlan.getIrDAG(), Pair.of(targetEdge, (Map<Integer, Long>)
dynOptData));
-      final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(newIrDAG);
-      final PhysicalPlan physicalPlan =
-          new PhysicalPlan(RuntimeIdManager.generatePhysicalPlanId(), newIrDAG, stageDAG);
-      return physicalPlan;
-    } catch (final InjectionException e) {
-      throw new RuntimeException(e);
-    }
+          final StageEdge targetEdge) {
+    // Data for dynamic optimization used in DataSkewRuntimePass
+    // is a map of <hash value, partition size>.
+    final PhysicalPlan physicalPlan =
+      new DataSkewRuntimePass()
+        .apply(originalPlan, Pair.of(targetEdge, (Map<Integer, Long>) dynOptData));
+    return physicalPlan;
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 67afbad..572cbd1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -16,19 +16,17 @@
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.common.DataSkewMetricFactory;
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
 
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.common.HashRange;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +39,7 @@ import java.util.stream.Collectors;
  * this RuntimePass identifies a number of keys with big partition sizes(skewed key)
  * and evenly redistributes data via overwriting incoming edges of destination tasks.
  */
-public final class DataSkewRuntimePass extends RuntimePass<Pair<IREdge, Map<Integer,
Long>>> {
+public final class DataSkewRuntimePass extends RuntimePass<Pair<StageEdge, Map<Integer,
Long>>> {
   private static final Logger LOG = LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
   private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
   // Skewed keys denote for top n keys in terms of partition size.
@@ -72,16 +70,9 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<IREdge,
Map<Inte
   }
 
   @Override
-  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> irDAG,
-                                     final Pair<IREdge, Map<Integer, Long>> metricData)
{
-    // get edges to optimize
-    final List<IREdge> optimizationEdges = irDAG.getVertices().stream()
-        .flatMap(v -> irDAG.getIncomingEdgesOf(v).stream())
-        .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
-            .equals(e.getPropertyValue(MetricCollectionProperty.class)))
-        .collect(Collectors.toList());
-
-    final IREdge targetEdge = metricData.left();
+  public PhysicalPlan apply(final PhysicalPlan originalPlan,
+                            final Pair<StageEdge, Map<Integer, Long>> metricData)
{
+    final StageEdge targetEdge = metricData.left();
     // Get number of evaluators of the next stage (number of blocks).
     final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).get();
 
@@ -91,9 +82,19 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<IREdge,
Map<Inte
     for (int i = 0; i < dstParallelism; i++) {
       taskIdxToKeyRange.put(i, keyRanges.get(i));
     }
+
     // Overwrite the previously assigned key range in the physical DAG with the new range.
-    targetEdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(taskIdxToKeyRange)));
-    return irDAG;
+    final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
+    for (Stage stage : stageDAG.getVertices()) {
+      List<StageEdge> stageEdges = stageDAG.getOutgoingEdgesOf(stage);
+      for (StageEdge edge : stageEdges) {
+        if (edge.equals(targetEdge)) {
+          edge.setTaskIdxToKeyRange(taskIdxToKeyRange);
+        }
+      }
+    }
+
+    return new PhysicalPlan(originalPlan.getId(), stageDAG);
   }
 
   public List<Integer> identifySkewedKeys(final Map<Integer, Long> keyValToPartitionSizeMap)
{
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
index 5a8471f..f30403f 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
@@ -15,11 +15,9 @@
  */
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
-import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.pass.Pass;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 import java.util.Set;
 import java.util.function.BiFunction;
@@ -31,7 +29,7 @@ import java.util.function.BiFunction;
  * @param <T> type of the metric data used for dynamic optimization.
  */
 public abstract class RuntimePass<T> extends Pass
-    implements BiFunction<DAG<IRVertex, IREdge>, T, DAG<IRVertex, IREdge>>
{
+    implements BiFunction<PhysicalPlan, T, PhysicalPlan> {
   /**
    * @return the set of event handlers used with the runtime pass.
    */
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index 4286bca..30f7111 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -16,7 +16,6 @@
 package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.Serializable;
@@ -28,7 +27,6 @@ import java.util.Map;
  */
 public final class PhysicalPlan implements Serializable {
   private final String id;
-  private final DAG<IRVertex, IREdge> irDAG;
   private final DAG<Stage, StageEdge> stageDAG;
   private final Map<String, IRVertex> idToIRVertex;
 
@@ -39,10 +37,8 @@ public final class PhysicalPlan implements Serializable {
    * @param stageDAG        the DAG of stages.
    */
   public PhysicalPlan(final String id,
-                      final DAG<IRVertex, IREdge> irDAG,
                       final DAG<Stage, StageEdge> stageDAG) {
     this.id = id;
-    this.irDAG = irDAG;
     this.stageDAG = stageDAG;
 
     idToIRVertex = new HashMap<>();
@@ -74,13 +70,6 @@ public final class PhysicalPlan implements Serializable {
     return idToIRVertex;
   }
 
-  /**
-   * @return IR DAG.
-   */
-  public DAG<IRVertex, IREdge> getIrDAG() {
-    return irDAG;
-  }
-
   @Override
   public String toString() {
     return stageDAG.toString();
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 5218b8f..ced7564 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -16,11 +16,17 @@
 package edu.snu.nemo.runtime.common.plan;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Edge of a stage that connects an IRVertex of the source stage to an IRVertex of the destination
stage.
@@ -40,6 +46,11 @@ public final class StageEdge extends RuntimeEdge<Stage> {
   private final IRVertex dstVertex;
 
   /**
+   * The list between the task idx and key range to read.
+   */
+  private Map<Integer, KeyRange> taskIdxToKeyRange;
+
+  /**
    * Value for {@link CommunicationPatternProperty}.
    */
   private final CommunicationPatternProperty.Value dataCommunicationPatternValue;
@@ -71,6 +82,11 @@ public final class StageEdge extends RuntimeEdge<Stage> {
     super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
+    // Initialize the key range of each dst task.
+    this.taskIdxToKeyRange = new HashMap<>();
+    for (int taskIdx = 0; taskIdx < dstStage.getParallelism(); taskIdx++) {
+      taskIdxToKeyRange.put(taskIdx, HashRange.of(taskIdx, taskIdx + 1, false));
+    }
     this.dataCommunicationPatternValue = edgeProperties.get(CommunicationPatternProperty.class)
         .orElseThrow(() -> new RuntimeException(String.format(
             "CommunicationPatternProperty not set for %s", runtimeEdgeId)));
@@ -110,6 +126,52 @@ public final class StageEdge extends RuntimeEdge<Stage> {
   }
 
   /**
+   * @return the list between the task idx and key range to read.
+   */
+  public Map<Integer, KeyRange> getTaskIdxToKeyRange() {
+    return taskIdxToKeyRange;
+  }
+
+  /**
+   * Sets the task idx to key range list.
+   *
+   * @param taskIdxToKeyRange the list to set.
+   */
+  public void setTaskIdxToKeyRange(final Map<Integer, KeyRange> taskIdxToKeyRange)
{
+    this.taskIdxToKeyRange = taskIdxToKeyRange;
+  }
+
+  /**
+   * @param edge edge to compare.
+   * @return whether or not the edge has the same itinerary
+   */
+  public Boolean hasSameItineraryAs(final StageEdge edge) {
+    return getSrc().equals(edge.getSrc()) && getDst().equals(edge.getDst());
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    final StageEdge stageEdge = (StageEdge) o;
+    return getExecutionProperties().equals(stageEdge.getExecutionProperties()) &&
hasSameItineraryAs(stageEdge);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+      .append(getSrc().hashCode())
+      .append(getDst().hashCode())
+      .append(getExecutionProperties())
+      .toHashCode();
+  }
+
+  /**
    * @return {@link CommunicationPatternProperty} value.
    */
   public CommunicationPatternProperty.Value getDataCommunicationPattern() {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index eadd8bf..7637660 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -16,8 +16,6 @@
 package edu.snu.nemo.runtime.executor.datatransfer;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.common.DataSkewMetricFactory;
-import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
@@ -117,9 +115,8 @@ public final class InputReader extends DataTransfer {
     assert (runtimeEdge instanceof StageEdge);
     final Optional<DataStoreProperty.Value> dataStoreProperty
         = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-    final DataSkewMetricFactory metricFactory =
-        (DataSkewMetricFactory) runtimeEdge.getExecutionProperties().get(DataSkewMetricProperty.class).get();
-    final KeyRange hashRangeToRead = metricFactory.getMetric().get(dstTaskIndex);
+    ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
+    final KeyRange hashRangeToRead = ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     if (hashRangeToRead == null) {
       throw new BlockFetchException(
           new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th
task"));
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 71e7d3f..243f739 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -19,9 +19,7 @@ import com.google.common.collect.Sets;
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.*;
@@ -330,27 +328,22 @@ public final class BatchScheduler implements Scheduler {
     });
   }
 
-  private IREdge getEdgeToOptimize(final String taskId) {
+  private StageEdge getEdgeToOptimize(final String taskId) {
     // Get a stage including the given task
     final Stage stagePutOnHold = physicalPlan.getStageDAG().getVertices().stream()
-        .filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
-        .findFirst()
-        .orElseThrow(() -> new RuntimeException());
+      .filter(stage -> stage.getId().equals(RuntimeIdManager.getStageIdFromTaskId(taskId)))
+      .findFirst()
+      .orElseThrow(() -> new RuntimeException());
 
     // Get outgoing edges of that stage with MetricCollectionProperty
     List<StageEdge> stageEdges = physicalPlan.getStageDAG().getOutgoingEdgesOf(stagePutOnHold);
-    IREdge targetEdge = null;
     for (StageEdge edge : stageEdges) {
-      final IRVertex srcIRVertex = edge.getSrcIRVertex();
-      final IRVertex dstIRVertex = edge.getDstIRVertex();
-      targetEdge = physicalPlan.getIrDAG().getEdgeBetween(srcIRVertex.getId(), dstIRVertex.getId());
-      if (MetricCollectionProperty.Value.DataSkewRuntimePass
-          .equals(targetEdge.getPropertyValue(MetricCollectionProperty.class).get())) {
-        break;
+      if (edge.getExecutionProperties().containsKey(MetricCollectionProperty.class)) {
+        return edge;
       }
     }
 
-    return targetEdge;
+    return null;
   }
 
   /**
@@ -370,7 +363,7 @@ public final class BatchScheduler implements Scheduler {
     final boolean stageComplete =
         planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
 
-    final IREdge targetEdge = getEdgeToOptimize(taskId);
+    final StageEdge targetEdge = getEdgeToOptimize(taskId);
     if (targetEdge == null) {
       throw new RuntimeException("No edges specified for data skew optimization");
     }
diff --git a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
index 3c23dcf..3b3646e 100644
--- a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
+++ b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
@@ -95,7 +95,7 @@ public final class TestPlanGenerator {
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = policy.runCompileTimeOptimization(irDAG,
EMPTY_DAG_DIRECTORY);
     final DAG<Stage, StageEdge> physicalDAG = PLAN_GENERATOR.apply(optimized);
-    return new PhysicalPlan("TestPlan", irDAG, physicalDAG);
+    return new PhysicalPlan("TestPlan", physicalDAG);
   }
 
   /**


Mime
View raw message