nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnya...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-64] Fix map stage hang under DataSkewPolicy (#99)
Date Tue, 14 Aug 2018 09:51:45 GMT
This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new d59373f  [NEMO-64] Fix map stage hang under DataSkewPolicy (#99)
d59373f is described below

commit d59373f59a2b82a170fe581d944364a63e015365
Author: Jeongyoon Eo <jeongyoon.eo@spl.snu.ac.kr>
AuthorDate: Tue Aug 14 18:51:42 2018 +0900

    [NEMO-64] Fix map stage hang under DataSkewPolicy (#99)
    
    JIRA: [NEMO-64: Fix map stage hang under DataSkewPolicy](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-64),
    [NEMO-181: Fix DataSkewPolicy bug for multiple shuffles](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-181)
    
    
    **Major changes:**
    - Data for dynamic optimization is aggregated at `DynOptDataHandler` in Scheduler instead of a variable in `MetricCollectionBarrierVertex`. Updating `MetricCollectionBarrierVertex` as data arrives in RuntimeMaster clashed with serializing IRDAG for scheduled Tasks, which caused `ConcurrentModificationException` and silently killed the Scheduler
    - Identifies the target edge to optimize via MetricCollectionProperty in case multiple shuffles are involved.
    
    **Minor changes to note:**
    - Removed now unused entries in `dataSizeMetricMsg`
    - Added `DataSkewMetricProperty` and `MetricFactory` to make task hash ranges of shuffle edges as an execution property, so that RuntimePass can optimize the given IR DAG
    
    **Tests for the changes:**
    - N/A(No new features, covered by `PerKeyMedianITCase`)
    
    **Other comments:**
    - Data for dynamic optimization will be aggregated in designated vertex and not in the RuntimeMaster via upcoming PR for [NEMO-99](https://issues.apache.org/jira/browse/NEMO-99)
    
    resolves [NEMO-64](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-64), [NEMO-181](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-181)
---
 .../main/java/edu/snu/nemo/client/JobLauncher.java |   1 +
 .../edu/snu/nemo/common/DataSkewMetricFactory.java |  36 ++++++++
 .../main/java/edu/snu/nemo/common}/HashRange.java  |  21 +++--
 .../main/java/edu/snu/nemo/common}/KeyRange.java   |   2 +-
 .../java/edu/snu/nemo/common/MetricFactory.java    |  26 ++++++
 .../executionproperty/DataSkewMetricProperty.java  |  43 +++++++++
 .../edu/snu/nemo/common/ir/vertex/IRVertex.java    |   9 ++
 .../nemo/compiler/backend/nemo/NemoBackend.java    |   2 +-
 .../compiletime/annotating/DefaultMetricPass.java  |  58 ++++++++++++
 .../annotating/SkewResourceSkewedDataPass.java     |  10 +-
 .../composite/DefaultCompositePass.java            |   1 +
 .../compiler/optimizer/policy/DataSkewPolicy.java  |   8 --
 .../optimizer/policy/PolicyBuilderTest.java        |   6 +-
 conf/src/main/java/edu/snu/nemo/conf/JobConf.java  |   1 -
 .../nemo/runtime/common/RuntimeIdGenerator.java    |   3 +-
 .../eventhandler/DynamicOptimizationEvent.java     |  31 ++++---
 .../DynamicOptimizationEventHandler.java           |   9 +-
 .../runtime/common/optimizer/RunTimeOptimizer.java |  40 +++++---
 .../pass/runtime/DataSkewRuntimePass.java          | 101 ++++++++++-----------
 .../common/optimizer/pass/runtime/RuntimePass.java |   7 +-
 .../snu/nemo/runtime/common/plan/PhysicalPlan.java |  11 +++
 .../runtime/common/plan/PhysicalPlanGenerator.java |  13 ++-
 .../snu/nemo/runtime/common/plan/StageEdge.java    |  31 -------
 .../nemo/runtime/common/plan/StagePartitioner.java |   4 +-
 .../edu/snu/nemo/runtime/common/plan/Task.java     |   4 +-
 runtime/common/src/main/proto/ControlMessage.proto |   2 -
 .../pass/runtime/DataSkewRuntimePassTest.java      |   4 +-
 .../runtime/executor/data/BlockManagerWorker.java  |   4 +-
 .../nemo/runtime/executor/data/block/Block.java    |   2 +-
 .../runtime/executor/data/block/FileBlock.java     |   2 +-
 .../data/block/NonSerializedMemoryBlock.java       |   2 +-
 .../executor/data/block/SerializedMemoryBlock.java |   2 +-
 .../runtime/executor/datatransfer/InputReader.java |  14 +--
 .../nemo/runtime/executor/data/BlockStoreTest.java |   4 +-
 .../snu/nemo/runtime/executor/data/BlockTest.java  |   2 +-
 .../executor/datatransfer/DataTransferTest.java    |  21 +++++
 .../runtime/master/DataSkewDynOptDataHandler.java  |  61 +++++++++++++
 .../snu/nemo/runtime/master/DynOptDataHandler.java |  33 +++++++
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |  49 +---------
 .../master/resource/ExecutorRepresenter.java       |   2 -
 .../runtime/master/scheduler/BatchScheduler.java   |  79 +++++++++-------
 .../scheduler/NodeShareSchedulingConstraint.java   |   2 +-
 .../scheduler/SchedulingConstraintRegistry.java    |   2 +-
 .../SkewnessAwareSchedulingConstraint.java         |  19 ++--
 .../runtime/master/scheduler/TaskDispatcher.java   |   1 -
 .../SkewnessAwareSchedulingConstraintTest.java     |  59 +++++++-----
 .../runtime/common/plan/TestPlanGenerator.java     |   2 +-
 47 files changed, 558 insertions(+), 288 deletions(-)

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index f0ae953..5950854 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -315,6 +315,7 @@ public final class JobLauncher {
     cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumListeningThreads.class);
     cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumWorkingThreads.class);
     cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class);
+    cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
     cl.processCommandLine(args);
     return confBuilder.build();
   }
diff --git a/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java b/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java
new file mode 100644
index 0000000..2847d8d
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common;
+
+import java.util.Map;
+
+/**
+ * A {@link MetricFactory} which is used for data skew handling.
+ */
+public final class DataSkewMetricFactory implements MetricFactory<Map<Integer, KeyRange>> {
+  private Map<Integer, KeyRange> metric;
+
+  /**
+   * Default constructor.
+   */
+  public DataSkewMetricFactory(final Map<Integer, KeyRange> metric) {
+    this.metric = metric;
+  }
+
+  public Map<Integer, KeyRange> getMetric() {
+    return metric;
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java b/common/src/main/java/edu/snu/nemo/common/HashRange.java
similarity index 90%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java
rename to common/src/main/java/edu/snu/nemo/common/HashRange.java
index 50e4334..cdc7d49 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java
+++ b/common/src/main/java/edu/snu/nemo/common/HashRange.java
@@ -13,7 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.data;
+package edu.snu.nemo.common;
+
+import java.util.Arrays;
 
 /**
  * Descriptor for hash range.
@@ -109,22 +111,23 @@ public final class HashRange implements KeyRange<Integer> {
       return false;
     }
     final HashRange hashRange = (HashRange) o;
-    if (rangeBeginInclusive != hashRange.rangeBeginInclusive) {
+    if (rangeBeginInclusive != hashRange.rangeBeginInclusive
+        || rangeEndExclusive != hashRange.rangeEndExclusive
+        || isSkewed != hashRange.isSkewed) {
       return false;
     }
-    return rangeEndExclusive == hashRange.rangeEndExclusive;
+    return true;
   }
 
   @Override
   public int hashCode() {
-    int result = rangeBeginInclusive;
-    result = 31 * result + rangeEndExclusive;
-    return result;
+    return Arrays.hashCode(new Object[] {
+        rangeBeginInclusive,
+        rangeEndExclusive,
+        isSkewed,
+    });
   }
 
-  public void setAsSkewed() {
-    isSkewed = true;
-  }
   public boolean isSkewed() {
     return isSkewed;
   }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java b/common/src/main/java/edu/snu/nemo/common/KeyRange.java
similarity index 97%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java
rename to common/src/main/java/edu/snu/nemo/common/KeyRange.java
index 0c46fc5..6fcac36 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java
+++ b/common/src/main/java/edu/snu/nemo/common/KeyRange.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.data;
+package edu.snu.nemo.common;
 
 import java.io.Serializable;
 
diff --git a/common/src/main/java/edu/snu/nemo/common/MetricFactory.java b/common/src/main/java/edu/snu/nemo/common/MetricFactory.java
new file mode 100644
index 0000000..a98b81b
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/MetricFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common;
+
+import java.io.Serializable;
+
+/**
+ * A serializable metric factory.
+ *
+ * @param <T> metric type.
+ */
+public interface MetricFactory<T> extends Serializable {
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java
new file mode 100644
index 0000000..a5f37a9
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.edge.executionproperty;
+
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+
+/**
+ * DataSkewMetric ExecutionProperty.
+ */
+public final class DataSkewMetricProperty extends EdgeExecutionProperty<DataSkewMetricFactory> {
+  /**
+   * Constructor.
+   *
+   * @param value value of the execution property.
+   */
+  private DataSkewMetricProperty(final DataSkewMetricFactory value) {
+    super(value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static DataSkewMetricProperty of(final DataSkewMetricFactory value) {
+    return new DataSkewMetricProperty(value);
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index 67fc42d..09ebfbf 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -29,6 +29,7 @@ import java.util.Optional;
  */
 public abstract class IRVertex extends Vertex {
   private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
+  private boolean stagePartitioned;
 
   /**
    * Constructor of IRVertex.
@@ -36,6 +37,7 @@ public abstract class IRVertex extends Vertex {
   public IRVertex() {
     super(IdManager.newVertexId());
     this.executionProperties = ExecutionPropertyMap.of(this);
+    this.stagePartitioned = false;
   }
 
   /**
@@ -89,6 +91,13 @@ public abstract class IRVertex extends Vertex {
     return executionProperties;
   }
 
+  public final void setStagePartitioned() {
+    stagePartitioned = true;
+  }
+  public final boolean getStagePartitioned() {
+    return stagePartitioned;
+  }
+
   /**
    * @return IRVertex properties in String form.
    */
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 3e933a2..da7adb6 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -61,6 +61,6 @@ public final class NemoBackend implements Backend<PhysicalPlan> {
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator physicalPlanGenerator) {
     final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
-    return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
+    return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), irDAG, stageDAG);
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
new file mode 100644
index 0000000..c96ac0b
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Pass for initiating IREdge Metric ExecutionProperty with default key range.
+ */
+public final class DefaultMetricPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public DefaultMetricPass() {
+    super(DataSkewMetricProperty.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.topologicalDo(dst ->
+      dag.getIncomingEdgesOf(dst).forEach(edge -> {
+        if (CommunicationPatternProperty.Value.Shuffle
+            .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
+          final int parallelism = dst.getPropertyValue(ParallelismProperty.class).get();
+          final Map<Integer, KeyRange> metric = new HashMap<>();
+          for (int i = 0; i < parallelism; i++) {
+            metric.put(i, HashRange.of(i, i + 1, false));
+          }
+          edge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
+        }
+      }));
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index e2ac70f..c5bc1d9 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -53,10 +53,16 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass {
         .filter(v -> v instanceof MetricCollectionBarrierVertex)
         .forEach(v -> v.setProperty(DynamicOptimizationProperty
             .of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
+
     dag.getVertices().stream()
         .filter(v -> hasMetricCollectionBarrierVertexAsParent(dag, v)
-            && !v.getPropertyValue(ResourceSkewedDataProperty.class).isPresent())
-        .forEach(v -> v.setProperty(ResourceSkewedDataProperty.of(true)));
+            && !v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class))
+        .forEach(childV -> {
+          childV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
+          dag.getDescendants(childV.getId()).forEach(descendentV -> {
+            descendentV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
+          });
+        });
 
     return dag;
   }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java
index 4dc16d4..6bb119b 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java
@@ -31,6 +31,7 @@ public final class DefaultCompositePass extends CompositePass {
   public DefaultCompositePass() {
     super(Arrays.asList(
         new DefaultParallelismPass(),
+        new DefaultMetricPass(),
         new DefaultEdgeEncoderPass(),
         new DefaultEdgeDecoderPass(),
         new DefaultDataStorePass(),
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index 9d7eb29..acb8d50 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -44,14 +44,6 @@ public final class DataSkewPolicy implements Policy {
     this.policy = BUILDER.build();
   }
 
-  public DataSkewPolicy(final int skewness) {
-    this.policy = new PolicyBuilder()
-        .registerRuntimePass(new DataSkewRuntimePass().setNumSkewedKeys(skewness), new SkewCompositePass())
-        .registerCompileTimePass(new LoopOptimizationCompositePass())
-        .registerCompileTimePass(new DefaultCompositePass())
-        .build();
-  }
-
   @Override
   public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
       throws Exception {
diff --git a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
index f75cb8c..79191ba 100644
--- a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -26,19 +26,19 @@ import static org.junit.Assert.assertTrue;
 public final class PolicyBuilderTest {
   @Test
   public void testDisaggregationPolicy() {
-    assertEquals(17, DisaggregationPolicy.BUILDER.getCompileTimePasses().size());
+    assertEquals(18, DisaggregationPolicy.BUILDER.getCompileTimePasses().size());
     assertEquals(0, DisaggregationPolicy.BUILDER.getRuntimePasses().size());
   }
 
   @Test
   public void testTransientResourcePolicy() {
-    assertEquals(19, TransientResourcePolicy.BUILDER.getCompileTimePasses().size());
+    assertEquals(20, TransientResourcePolicy.BUILDER.getCompileTimePasses().size());
     assertEquals(0, TransientResourcePolicy.BUILDER.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
-    assertEquals(21, DataSkewPolicy.BUILDER.getCompileTimePasses().size());
+    assertEquals(22, DataSkewPolicy.BUILDER.getCompileTimePasses().size());
     assertEquals(1, DataSkewPolicy.BUILDER.getRuntimePasses().size());
   }
 
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 871bf44..d26adaf 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -126,7 +126,6 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
-<<<<<<< HEAD
    * Max number of attempts for task scheduling.
    */
   @NamedParameter(doc = "Max number of task attempts", short_name = "max_task_attempt", default_value = "1")
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index 2e2e616..049ccbe 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -43,9 +43,10 @@ public final class RuntimeIdGenerator {
    * Generates the ID for physical plan.
    *
    * @return the generated ID
+   * TODO #100: Refactor string-based RuntimeIdGenerator for IR-based DynOpt
    */
   public static String generatePhysicalPlanId() {
-    return "Plan-" + physicalPlanIdGenerator.getAndIncrement();
+    return "Plan-" + physicalPlanIdGenerator.get();
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index 50df799..9a8b26a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -19,7 +19,7 @@
 package edu.snu.nemo.runtime.common.eventhandler;
 
 import edu.snu.nemo.common.eventhandler.RuntimeEvent;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
+import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 /**
@@ -27,25 +27,27 @@ import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
  */
 public final class DynamicOptimizationEvent implements RuntimeEvent {
   private final PhysicalPlan physicalPlan;
-  private final MetricCollectionBarrierVertex metricCollectionBarrierVertex;
+  private final Object dynOptData;
   private final String taskId;
   private final String executorId;
+  private final IREdge targetEdge;
 
   /**
    * Default constructor.
    * @param physicalPlan physical plan to be optimized.
-   * @param metricCollectionBarrierVertex metric collection barrier vertex to retrieve metric data from.
    * @param taskId id of the task which triggered the dynamic optimization.
    * @param executorId the id of executor which executes {@code taskId}
    */
   public DynamicOptimizationEvent(final PhysicalPlan physicalPlan,
-                                  final MetricCollectionBarrierVertex metricCollectionBarrierVertex,
+                                  final Object dynOptData,
                                   final String taskId,
-                                  final String executorId) {
+                                  final String executorId,
+                                  final IREdge targetEdge) {
     this.physicalPlan = physicalPlan;
-    this.metricCollectionBarrierVertex = metricCollectionBarrierVertex;
     this.taskId = taskId;
+    this.dynOptData = dynOptData;
     this.executorId = executorId;
+    this.targetEdge = targetEdge;
   }
 
   /**
@@ -56,14 +58,7 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
   }
 
   /**
-   * @return the metric collection barrier vertex for the dynamic optimization.
-   */
-  public MetricCollectionBarrierVertex getMetricCollectionBarrierVertex() {
-    return this.metricCollectionBarrierVertex;
-  }
-
-  /**
-   * @return id of the task which triggered the dynamic optimization
+   * @return id of the task which triggered the dynamic optimization.
    */
   public String getTaskId() {
     return taskId;
@@ -75,4 +70,12 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
   public String getExecutorId() {
     return executorId;
   }
+
+  public Object getDynOptData() {
+    return this.dynOptData;
+  }
+
+  public IREdge getTargetEdge() {
+    return this.targetEdge;
+  }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index 56ce7c1..a642248 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -20,7 +20,7 @@ package edu.snu.nemo.runtime.common.eventhandler;
 
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
+import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.runtime.common.optimizer.RunTimeOptimizer;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.reef.wake.impl.PubSubEventHandler;
@@ -50,11 +50,10 @@ public final class DynamicOptimizationEventHandler implements RuntimeEventHandle
   @Override
   public void onNext(final DynamicOptimizationEvent dynamicOptimizationEvent) {
     final PhysicalPlan physicalPlan = dynamicOptimizationEvent.getPhysicalPlan();
-    final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-            dynamicOptimizationEvent.getMetricCollectionBarrierVertex();
+    final Object dynOptData = dynamicOptimizationEvent.getDynOptData();
+    final IREdge targetEdge = dynamicOptimizationEvent.getTargetEdge();
 
-    final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan,
-        metricCollectionBarrierVertex);
+    final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan, dynOptData, targetEdge);
 
     pubSubEventHandler.onNext(new UpdatePhysicalPlanEvent(newPlan, dynamicOptimizationEvent.getTaskId(),
         dynamicOptimizationEvent.getExecutorId()));
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
index e251c98..30ad958 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
@@ -16,10 +16,17 @@
 package edu.snu.nemo.runtime.common.optimizer;
 
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 
 import java.util.*;
 
@@ -36,24 +43,27 @@ public final class RunTimeOptimizer {
   /**
    * Dynamic optimization method to process the dag with an appropriate pass, decided by the stats.
    * @param originalPlan original physical execution plan.
-   * @param metricCollectionBarrierVertex the vertex that collects metrics and chooses which optimization to perform.
    * @return the newly updated optimized physical plan.
    */
   public static synchronized PhysicalPlan dynamicOptimization(
           final PhysicalPlan originalPlan,
-          final MetricCollectionBarrierVertex metricCollectionBarrierVertex) {
-    final DynamicOptimizationProperty.Value dynamicOptimizationType =
-        metricCollectionBarrierVertex.getPropertyValue(DynamicOptimizationProperty.class).get();
+          final Object dynOptData,
+          final IREdge targetEdge) {
+    try {
+      final PhysicalPlanGenerator physicalPlanGenerator =
+          Tang.Factory.getTang().newInjector().getInstance(PhysicalPlanGenerator.class);
 
-    switch (dynamicOptimizationType) {
-      case DataSkewRuntimePass:
-        // Metric data for DataSkewRuntimePass is a pair of blockIds and map of hashrange, partition size.
-        final Pair<List<String>, Map<Integer, Long>> metricData =
-            Pair.of(metricCollectionBarrierVertex.getBlockIds(),
-                (Map<Integer, Long>) metricCollectionBarrierVertex.getMetricData());
-        return new DataSkewRuntimePass().apply(originalPlan, metricData);
-      default:
-        throw new UnsupportedOperationException("Unknown runtime pass: " + dynamicOptimizationType);
+      // Data for dynamic optimization used in DataSkewRuntimePass
+      // is a map of <hash value, partition size>.
+      final DAG<IRVertex, IREdge> newIrDAG =
+          new DataSkewRuntimePass()
+              .apply(originalPlan.getIrDAG(), Pair.of(targetEdge, (Map<Integer, Long>) dynOptData));
+      final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(newIrDAG);
+      final PhysicalPlan physicalPlan =
+          new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), newIrDAG, stageDAG);
+      return physicalPlan;
+    } catch (final InjectionException e) {
+      throw new RuntimeException(e);
     }
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 3218317..67afbad 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -16,18 +16,18 @@
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.DataSkewMetricFactory;
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.exception.DynamicOptimizationException;
-
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.common.data.HashRange;
+
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.common.HashRange;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,19 +41,24 @@ import java.util.stream.Collectors;
  * this RuntimePass identifies a number of keys with big partition sizes(skewed key)
  * and evenly redistributes data via overwriting incoming edges of destination tasks.
  */
-public final class DataSkewRuntimePass extends RuntimePass<Pair<List<String>, Map<Integer, Long>>> {
+public final class DataSkewRuntimePass extends RuntimePass<Pair<IREdge, Map<Integer, Long>>> {
   private static final Logger LOG = LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
   private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
   // Skewed keys denote for top n keys in terms of partition size.
   public static final int DEFAULT_NUM_SKEWED_KEYS = 3;
-  private int numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS;
+  private int numSkewedKeys;
 
   /**
    * Constructor.
    */
   public DataSkewRuntimePass() {
-    this.eventHandlers = Collections.singleton(
-        DynamicOptimizationEventHandler.class);
+    this.eventHandlers = Collections.singleton(DynamicOptimizationEventHandler.class);
+    this.numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS;
+  }
+
+  public DataSkewRuntimePass(final int numOfSkewedKeys) {
+    this();
+    this.numSkewedKeys = numOfSkewedKeys;
   }
 
   public DataSkewRuntimePass setNumSkewedKeys(final int numOfSkewedKeys) {
@@ -67,44 +72,32 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<List<String>, Ma
   }
 
   @Override
-  public PhysicalPlan apply(final PhysicalPlan originalPlan,
-                            final Pair<List<String>, Map<Integer, Long>> metricData) {
-    // Builder to create new stages.
-    final DAGBuilder<Stage, StageEdge> physicalDAGBuilder =
-        new DAGBuilder<>(originalPlan.getStageDAG());
-    final List<String> blockIds = metricData.left();
-
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> irDAG,
+                                     final Pair<IREdge, Map<Integer, Long>> metricData) {
     // get edges to optimize
-    final List<String> optimizationEdgeIds = blockIds.stream().map(blockId ->
-        RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId)).collect(Collectors.toList());
-    final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
-    final List<StageEdge> optimizationEdges = stageDAG.getVertices().stream()
-        .flatMap(stage -> stageDAG.getIncomingEdgesOf(stage).stream())
-        .filter(stageEdge -> optimizationEdgeIds.contains(stageEdge.getId()))
+    final List<IREdge> optimizationEdges = irDAG.getVertices().stream()
+        .flatMap(v -> irDAG.getIncomingEdgesOf(v).stream())
+        .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
+            .equals(e.getPropertyValue(MetricCollectionProperty.class)))
         .collect(Collectors.toList());
 
+    final IREdge targetEdge = metricData.left();
     // Get number of evaluators of the next stage (number of blocks).
-    final Integer numOfDstTasks = optimizationEdges.stream().findFirst().orElseThrow(() ->
-        new RuntimeException("optimization edges are empty")).getDst().getTaskIds().size();
+    final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).get();
 
     // Calculate keyRanges.
-    final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), numOfDstTasks);
-
+    final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), dstParallelism);
+    final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
+    for (int i = 0; i < dstParallelism; i++) {
+      taskIdxToKeyRange.put(i, keyRanges.get(i));
+    }
     // Overwrite the previously assigned key range in the physical DAG with the new range.
-    optimizationEdges.forEach(optimizationEdge -> {
-      // Update the information.
-      final Map<Integer, KeyRange> taskIdxToHashRange = new HashMap<>();
-      for (int taskIdx = 0; taskIdx < numOfDstTasks; taskIdx++) {
-        taskIdxToHashRange.put(taskIdx, keyRanges.get(taskIdx));
-      }
-      optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
-    });
-
-    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build());
+    targetEdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(taskIdxToKeyRange)));
+    return irDAG;
   }
 
   public List<Integer> identifySkewedKeys(final Map<Integer, Long> keyValToPartitionSizeMap) {
-    // Identify skewed keyes.
+    // Identify skewed keys.
     List<Map.Entry<Integer, Long>> sortedMetricData = keyValToPartitionSizeMap.entrySet().stream()
         .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
         .collect(Collectors.toList());
@@ -134,31 +127,31 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<List<String>, Ma
    * to a key range of partitions with approximate size of (total size of partitions / the number of tasks).
    *
    * @param keyToPartitionSizeMap a map of key to partition size.
-   * @param numOfDstTasks the number of tasks that receives this data as input.
+   * @param dstParallelism the number of tasks that receive this data as input.
    * @return the list of key ranges calculated.
    */
   @VisibleForTesting
   public List<KeyRange> calculateKeyRanges(final Map<Integer, Long> keyToPartitionSizeMap,
-                                           final Integer numOfDstTasks) {
-    // Get the biggest key.
-    final int maxKey = keyToPartitionSizeMap.keySet().stream()
+                                           final Integer dstParallelism) {
+    // Get the last key.
+    final int lastKey = keyToPartitionSizeMap.keySet().stream()
         .max(Integer::compareTo)
-        .orElseThrow(() -> new DynamicOptimizationException("Cannot find max key among blocks."));
+        .get();
 
     // Identify skewed keys, which is top numSkewedKeys number of keys.
     List<Integer> skewedKeys = identifySkewedKeys(keyToPartitionSizeMap);
 
     // Calculate the ideal size for each destination task.
     final Long totalSize = keyToPartitionSizeMap.values().stream().mapToLong(n -> n).sum(); // get total size
-    final Long idealSizePerTask = totalSize / numOfDstTasks; // and derive the ideal size per task
+    final Long idealSizePerTask = totalSize / dstParallelism; // and derive the ideal size per task
 
-    final List<KeyRange> keyRanges = new ArrayList<>(numOfDstTasks);
+    final List<KeyRange> keyRanges = new ArrayList<>(dstParallelism);
     int startingKey = 0;
     int finishingKey = 1;
     Long currentAccumulatedSize = keyToPartitionSizeMap.getOrDefault(startingKey, 0L);
     Long prevAccumulatedSize = 0L;
-    for (int i = 1; i <= numOfDstTasks; i++) {
-      if (i != numOfDstTasks) {
+    for (int i = 1; i <= dstParallelism; i++) {
+      if (i != dstParallelism) {
         // Ideal accumulated partition size for this task.
         final Long idealAccumulatedSize = idealSizePerTask * i;
         // By adding partition sizes, find the accumulated size nearest to the given ideal size.
@@ -185,15 +178,15 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<List<String>, Ma
         prevAccumulatedSize = currentAccumulatedSize;
         startingKey = finishingKey;
       } else { // last one: we put the range of the rest.
-        boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, finishingKey);
+        boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, lastKey + 1);
         keyRanges.add(i - 1,
-            HashRange.of(startingKey, maxKey + 1, isSkewedKey));
+            HashRange.of(startingKey, lastKey + 1, isSkewedKey));
 
-        while (finishingKey <= maxKey) {
+        while (finishingKey <= lastKey) {
           currentAccumulatedSize += keyToPartitionSizeMap.getOrDefault(finishingKey, 0L);
           finishingKey++;
         }
-        LOG.debug("KeyRange {}~{}, Size {}", startingKey, maxKey + 1,
+        LOG.debug("KeyRange {}~{}, Size {}", startingKey, lastKey + 1,
             currentAccumulatedSize - prevAccumulatedSize);
       }
     }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
index 249a239..5a8471f 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
@@ -15,9 +15,11 @@
  */
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
+import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.pass.Pass;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 import java.util.Set;
 import java.util.function.BiFunction;
@@ -28,7 +30,8 @@ import java.util.function.BiFunction;
  * after dynamic optimization.
  * @param <T> type of the metric data used for dynamic optimization.
  */
-public abstract class RuntimePass<T> extends Pass implements BiFunction<PhysicalPlan, T, PhysicalPlan> {
+public abstract class RuntimePass<T> extends Pass
+    implements BiFunction<DAG<IRVertex, IREdge>, T, DAG<IRVertex, IREdge>> {
   /**
    * @return the set of event handlers used with the runtime pass.
    */
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index 30f7111..4286bca 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.Serializable;
@@ -27,6 +28,7 @@ import java.util.Map;
  */
 public final class PhysicalPlan implements Serializable {
   private final String id;
+  private final DAG<IRVertex, IREdge> irDAG;
   private final DAG<Stage, StageEdge> stageDAG;
   private final Map<String, IRVertex> idToIRVertex;
 
@@ -37,8 +39,10 @@ public final class PhysicalPlan implements Serializable {
    * @param stageDAG        the DAG of stages.
    */
   public PhysicalPlan(final String id,
+                      final DAG<IRVertex, IREdge> irDAG,
                       final DAG<Stage, StageEdge> stageDAG) {
     this.id = id;
+    this.irDAG = irDAG;
     this.stageDAG = stageDAG;
 
     idToIRVertex = new HashMap<>();
@@ -70,6 +74,13 @@ public final class PhysicalPlan implements Serializable {
     return idToIRVertex;
   }
 
+  /**
+   * @return IR DAG.
+   */
+  public DAG<IRVertex, IREdge> getIrDAG() {
+    return irDAG;
+  }
+
   @Override
   public String toString() {
     return stageDAG.toString();
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index ca4124b..16f4bc4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -34,6 +34,8 @@ import edu.snu.nemo.common.exception.PhysicalPlanGenerationException;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.util.*;
@@ -45,6 +47,7 @@ import java.util.function.Function;
 public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> {
   private final String dagDirectory;
   private final StagePartitioner stagePartitioner;
+  private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlanGenerator.class.getName());
 
   /**
    * Private constructor.
@@ -147,7 +150,7 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
 
       final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
 
-      // Prepare vertexIdtoReadables
+      // Prepare vertexIdToReadables
       final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
       for (int i = 0; i < stageParallelism; i++) {
         vertexIdToReadables.add(new HashMap<>());
@@ -156,7 +159,7 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
       // For each IRVertex,
       for (final IRVertex irVertex : stageVertices) {
         // Take care of the readables of a source vertex.
-        if (irVertex instanceof SourceVertex) {
+        if (irVertex instanceof SourceVertex && !irVertex.getStagePartitioned()) {
           final SourceVertex sourceVertex = (SourceVertex) irVertex;
           try {
             final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
@@ -200,6 +203,12 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
         dagOfStagesBuilder.addVertex(stage);
         stageIdToStageMap.put(stageId, stage);
       }
+
+      // To prevent re-fetching readables in source vertex
+      // during re-generation of physical plan for dynamic optimization.
+      for (IRVertex irVertex : stageVertices) {
+        irVertex.setStagePartitioned();
+      }
     }
 
     // Add StageEdges
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 98773ea..5218b8f 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -21,11 +21,6 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.data.HashRange;
-
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Edge of a stage that connects an IRVertex of the source stage to an IRVertex of the destination stage.
@@ -45,11 +40,6 @@ public final class StageEdge extends RuntimeEdge<Stage> {
   private final IRVertex dstVertex;
 
   /**
-   * The list between the task idx and key range to read.
-   */
-  private Map<Integer, KeyRange> taskIdxToKeyRange;
-
-  /**
    * Value for {@link CommunicationPatternProperty}.
    */
   private final CommunicationPatternProperty.Value dataCommunicationPatternValue;
@@ -81,11 +71,6 @@ public final class StageEdge extends RuntimeEdge<Stage> {
     super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
-    // Initialize the key range of each dst task.
-    this.taskIdxToKeyRange = new HashMap<>();
-    for (int taskIdx = 0; taskIdx < dstStage.getTaskIds().size(); taskIdx++) {
-      taskIdxToKeyRange.put(taskIdx, HashRange.of(taskIdx, taskIdx + 1, false));
-    }
     this.dataCommunicationPatternValue = edgeProperties.get(CommunicationPatternProperty.class)
         .orElseThrow(() -> new RuntimeException(String.format(
             "CommunicationPatternProperty not set for %s", runtimeEdgeId)));
@@ -125,22 +110,6 @@ public final class StageEdge extends RuntimeEdge<Stage> {
   }
 
   /**
-   * @return the list between the task idx and key range to read.
-   */
-  public Map<Integer, KeyRange> getTaskIdxToKeyRange() {
-    return taskIdxToKeyRange;
-  }
-
-  /**
-   * Sets the task idx to key range list.
-   *
-   * @param taskIdxToKeyRange the list to set.
-   */
-  public void setTaskIdxToKeyRange(final Map<Integer, KeyRange> taskIdxToKeyRange) {
-    this.taskIdxToKeyRange = taskIdxToKeyRange;
-  }
-
-  /**
    * @return {@link CommunicationPatternProperty} value.
    */
   public CommunicationPatternProperty.Value getDataCommunicationPattern() {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
index abe0a8f..88ef432 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -83,7 +83,7 @@ public final class StagePartitioner implements Function<DAG<IRVertex, IREdge>, M
           continue;
         }
         // Assign stageId
-        if (testMergability(edge, irDAG)) {
+        if (testMergeability(edge, irDAG)) {
           vertexToStageIdMap.put(connectedIRVertex, stageId);
         } else {
           vertexToStageIdMap.put(connectedIRVertex, nextStageIndex.getValue());
@@ -99,7 +99,7 @@ public final class StagePartitioner implements Function<DAG<IRVertex, IREdge>, M
    * @param dag IR DAG which contains {@code edge}
    * @return {@code true} if and only if the source and the destination vertex of the edge can be merged into one stage.
    */
-  private boolean testMergability(final IREdge edge, final DAG<IRVertex, IREdge> dag) {
+  private boolean testMergeability(final IREdge edge, final DAG<IRVertex, IREdge> dag) {
     // If the destination vertex has multiple inEdges, return false
     if (dag.getIncomingEdgesOf(edge.getDst()).size() > 1) {
       return false;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 0cfe863..3e95830 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -20,9 +20,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 
 /**
  * A Task is a self-contained executable that can be executed on a machine.
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index 59a3212..911990e 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -117,8 +117,6 @@ message BlockStateChangedMsg {
 message DataSizeMetricMsg {
     // TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
     repeated PartitionSizeEntry partitionSize = 1;
-    required string blockId = 2;
-    required string srcIRVertexId = 3;
 }
 
 message PartitionSizeEntry {
diff --git a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index 319b8ba..04fca2b 100644
--- a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -15,8 +15,8 @@
  */
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index ba0a71c..4cf0b12 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -26,7 +26,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataPersistenceProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.comm.ControlMessage.ByteTransferContextDescriptor;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -296,8 +296,6 @@ public final class BlockManagerWorker {
               .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
               .setType(ControlMessage.MessageType.DataSizeMetric)
               .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
-                  .setBlockId(blockId)
-                  .setSrcIRVertexId(srcIRVertexId)
                   .addAllPartitionSize(partitionSizeEntries)
               )
               .build());
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
index e0de210..960f2c5 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.data.block;
 
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 6eef824..be42f6b 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.runtime.executor.data.block;
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.executor.data.*;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.Partition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index 5bf1e01..7722f2d 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.data.block;
 
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index 847558f..03a7ab1 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.executor.data.block;
 
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 4471ae4..9870259 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -16,6 +16,8 @@
 package edu.snu.nemo.runtime.executor.datatransfer;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
@@ -23,16 +25,14 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyV
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.UnsupportedCommPatternException;
-import edu.snu.nemo.runtime.common.data.HashRange;
+import edu.snu.nemo.common.HashRange;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -45,7 +45,6 @@ import java.util.stream.StreamSupport;
  * Represents the input data transfer to a task.
  */
 public final class InputReader extends DataTransfer {
-  private static final Logger LOG = LoggerFactory.getLogger(InputReader.class.getName());
   private final int dstTaskIndex;
   private final BlockManagerWorker blockManagerWorker;
 
@@ -118,8 +117,9 @@ public final class InputReader extends DataTransfer {
     assert (runtimeEdge instanceof StageEdge);
     final Optional<DataStoreProperty.Value> dataStoreProperty
         = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-    final KeyRange hashRangeToRead =
-        ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
+    final DataSkewMetricFactory metricFactory =
+        (DataSkewMetricFactory) runtimeEdge.getExecutionProperties().get(DataSkewMetricProperty.class).get();
+    final KeyRange hashRangeToRead = metricFactory.getMetric().get(dstTaskIndex);
     if (hashRangeToRead == null) {
       throw new BlockFetchException(
           new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 8f1b7e5..61adee2 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -20,8 +20,8 @@ import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
index c05745d..46c6ce9 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
@@ -15,9 +15,9 @@
  */
 package edu.snu.nemo.runtime.executor.data;
 
+import edu.snu.nemo.common.HashRange;
 import edu.snu.nemo.common.coder.IntDecoderFactory;
 import edu.snu.nemo.common.coder.IntEncoderFactory;
-import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.executor.data.block.Block;
 import edu.snu.nemo.runtime.executor.data.block.FileBlock;
 import edu.snu.nemo.runtime.executor.data.block.NonSerializedMemoryBlock;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index c755543..229f30d 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -15,6 +15,9 @@
  */
 package edu.snu.nemo.runtime.executor.datatransfer;
 
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
@@ -300,6 +303,15 @@ public final class DataTransferTest {
     dummyIREdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
     dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
+    if (dummyIREdge.getPropertyValue(CommunicationPatternProperty.class).get()
+        .equals(CommunicationPatternProperty.Value.Shuffle)) {
+      final int parallelism = dstVertex.getPropertyValue(ParallelismProperty.class).get();
+      final Map<Integer, KeyRange> metric = new HashMap<>();
+      for (int i = 0; i < parallelism; i++) {
+        metric.put(i, HashRange.of(i, i + 1, false));
+      }
+      dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
+    }
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
     final RuntimeEdge dummyEdge;
 
@@ -383,6 +395,15 @@ public final class DataTransferTest {
         = dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     duplicateDataProperty.get().setRepresentativeEdgeId(edgeId);
     duplicateDataProperty.get().setGroupSize(2);
+    if (dummyIREdge.getPropertyValue(CommunicationPatternProperty.class).get()
+        .equals(CommunicationPatternProperty.Value.Shuffle)) {
+      final int parallelism = dstVertex.getPropertyValue(ParallelismProperty.class).get();
+      final Map<Integer, KeyRange> metric = new HashMap<>();
+      for (int i = 0; i < parallelism; i++) {
+        metric.put(i, HashRange.of(i, i + 1, false));
+      }
+      dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
+    }
     dummyIREdge.setProperty(DataStoreProperty.of(store));
     dummyIREdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     final RuntimeEdge dummyEdge, dummyEdge2;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java
new file mode 100644
index 0000000..47671ed
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Handler for aggregating data used in data skew dynamic optimization.
+ */
+public class DataSkewDynOptDataHandler implements DynOptDataHandler {
+  private final Map<Integer, Long> aggregatedDynOptData;
+
+  public DataSkewDynOptDataHandler() {
+    this.aggregatedDynOptData = new HashMap<>();
+  }
+
+  /**
+   * Updates data for dynamic optimization sent from Tasks.
+   * @param dynOptData data used for data skew dynamic optimization.
+   */
+  @Override
+  public final void updateDynOptData(final Object dynOptData) {
+    List<ControlMessage.PartitionSizeEntry> partitionSizeInfo
+        = (List<ControlMessage.PartitionSizeEntry>) dynOptData;
+    partitionSizeInfo.forEach(partitionSizeEntry -> {
+      final int hashIndex = partitionSizeEntry.getKey();
+      final long partitionSize = partitionSizeEntry.getSize();
+      if (aggregatedDynOptData.containsKey(hashIndex)) {
+        aggregatedDynOptData.compute(hashIndex, (originalKey, originalValue) -> originalValue + partitionSize);
+      } else {
+        aggregatedDynOptData.put(hashIndex, partitionSize);
+      }
+    });
+  }
+
+  /**
+   * Returns aggregated data for dynamic optimization.
+   * @return aggregated data used for data skew dynamic optimization.
+   */
+  @Override
+  public final Object getDynOptData() {
+    return aggregatedDynOptData;
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java
new file mode 100644
index 0000000..66fa310
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+/**
+ * Handler for aggregating data used in dynamic optimization.
+ */
+public interface DynOptDataHandler {
+  /**
+   * Updates data for dynamic optimization sent from Tasks.
+   * @param dynOptData data used for dynamic optimization.
+   */
+  void updateDynOptData(Object dynOptData);
+
+  /**
+   * Returns aggregated data for dynamic optimization.
+   * @return aggregated data used for dynamic optimization.
+   */
+  Object getDynOptData();
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 41ad243..d1fa2df 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -19,13 +19,13 @@ import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.scheduler.BatchScheduler;
 import edu.snu.nemo.runtime.master.servlet.*;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -82,8 +82,6 @@ public final class RuntimeMaster {
   private final MetricMessageHandler metricMessageHandler;
   private final MessageEnvironment masterMessageEnvironment;
   private final MetricStore metricStore;
-  private final Map<Integer, Long> aggregatedMetricData;
-  private final ExecutorService metricAggregationService;
   private final ClientRPC clientRPC;
   private final MetricManagerMaster metricManagerMaster;
   // For converting json data. This is a thread safe.
@@ -123,8 +121,6 @@ public final class RuntimeMaster {
     this.irVertices = new HashSet<>();
     this.resourceRequestCount = new AtomicInteger(0);
     this.objectMapper = new ObjectMapper();
-    this.aggregatedMetricData = new ConcurrentHashMap<>();
-    this.metricAggregationService = Executors.newFixedThreadPool(10);
     this.metricStore = MetricStore.getStore();
     this.metricServer = startRestMetricServer();
   }
@@ -343,10 +339,8 @@ public final class RuntimeMaster {
         LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
         throw new RuntimeException(exception);
       case DataSizeMetric:
-        final ControlMessage.DataSizeMetricMsg dataSizeMetricMsg = message.getDataSizeMetricMsg();
         // TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
-        accumulateBarrierMetric(dataSizeMetricMsg.getPartitionSizeList(),
-            dataSizeMetricMsg.getSrcIRVertexId(), dataSizeMetricMsg.getBlockId());
+        ((BatchScheduler) scheduler).updateDynOptData(message.getDataSizeMetricMsg().getPartitionSizeList());
         break;
       case MetricMessageReceived:
         final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
@@ -371,45 +365,6 @@ public final class RuntimeMaster {
     }
   }
 
-  /**
-   * Accumulates the metric data for a barrier vertex.
-   * TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
-   * TODO #98: Implement MetricVertex that collect metric used for dynamic optimization.
-   *
-   * @param partitionSizeInfo the size of partitions in a block to accumulate.
-   * @param srcVertexId       the ID of the source vertex.
-   * @param blockId           the ID of the block.
-   */
-  private void accumulateBarrierMetric(final List<ControlMessage.PartitionSizeEntry> partitionSizeInfo,
-                                       final String srcVertexId,
-                                       final String blockId) {
-    final IRVertex vertexToSendMetricDataTo = irVertices.stream()
-        .filter(irVertex -> irVertex.getId().equals(srcVertexId)).findFirst()
-        .orElseThrow(() -> new RuntimeException(srcVertexId + " doesn't exist in the submitted Physical Plan"));
-
-    if (vertexToSendMetricDataTo instanceof MetricCollectionBarrierVertex) {
-      final MetricCollectionBarrierVertex<Integer, Long> metricCollectionBarrierVertex =
-          (MetricCollectionBarrierVertex) vertexToSendMetricDataTo;
-
-      metricCollectionBarrierVertex.addBlockId(blockId);
-      metricAggregationService.submit(() -> {
-        // For each hash range index, we aggregate the metric data.
-        partitionSizeInfo.forEach(partitionSizeEntry -> {
-          final int key = partitionSizeEntry.getKey();
-          final long size = partitionSizeEntry.getSize();
-          if (aggregatedMetricData.containsKey(key)) {
-            aggregatedMetricData.compute(key, (existKey, existValue) -> existValue + size);
-          } else {
-            aggregatedMetricData.put(key, size);
-          }
-        });
-        metricCollectionBarrierVertex.setMetricData(aggregatedMetricData);
-      });
-    } else {
-      throw new RuntimeException("Something wrong happened at SkewCompositePass.");
-    }
-  }
-
   private static TaskState.State convertTaskState(final ControlMessage.TaskStateFromExecutor state) {
     switch (state) {
       case READY:
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index d0e21d8..d5c2e89 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -44,7 +44,6 @@ import java.util.stream.Stream;
  */
 @NotThreadSafe
 public final class ExecutorRepresenter {
-
   private final String executorId;
   private final ResourceSpecification resourceSpecification;
   private final Map<String, Task> runningComplyingTasks;
@@ -107,7 +106,6 @@ public final class ExecutorRepresenter {
         ? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task);
     runningTaskToAttempt.put(task, task.getAttemptIdx());
     failedTasks.remove(task);
-
     serializationExecutorService.submit(() -> {
       final byte[] serialized = SerializationUtils.serialize(task);
       sendControlMessage(
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 24c0e43..77881c0 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -17,18 +17,20 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.collect.Sets;
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.*;
 import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.DataSkewDynOptDataHandler;
+import edu.snu.nemo.runtime.master.DynOptDataHandler;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.common.exception.*;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.PlanStateManager;
@@ -75,6 +77,7 @@ public final class BatchScheduler implements Scheduler {
   private PhysicalPlan physicalPlan;
   private PlanStateManager planStateManager;
   private List<List<Stage>> sortedScheduleGroups;
+  private List<DynOptDataHandler> dynOptDataHandlers;
 
   @Inject
   private BatchScheduler(final TaskDispatcher taskDispatcher,
@@ -93,6 +96,8 @@ public final class BatchScheduler implements Scheduler {
           .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
     }
     this.executorRegistry = executorRegistry;
+    this.dynOptDataHandlers = new ArrayList<>();
+    dynOptDataHandlers.add(new DataSkewDynOptDataHandler());
   }
 
   /**
@@ -130,14 +135,12 @@ public final class BatchScheduler implements Scheduler {
    * Handles task state transition notifications sent from executors.
    * Note that we can receive notifications for previous task attempts, due to the nature of asynchronous events.
    * We ignore such late-arriving notifications, and only handle notifications for the current task attempt.
-   *
    * @param executorId the id of the executor where the message was sent from.
    * @param taskId whose state has changed
    * @param taskAttemptIndex of the task whose state has changed
    * @param newState the state to change to
    * @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise.
    */
-  @Override
   public void onTaskStateReportFromExecutor(final String executorId,
                                             final String taskId,
                                             final int taskAttemptIndex,
@@ -158,7 +161,7 @@ public final class BatchScheduler implements Scheduler {
           onTaskExecutionFailedRecoverable(executorId, taskId, failureCause);
           break;
         case ON_HOLD:
-          onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
+          onTaskExecutionOnHold(executorId, taskId);
           break;
         case FAILED:
           throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The plan failed on Task #")
@@ -364,15 +367,34 @@ public final class BatchScheduler implements Scheduler {
     });
   }
 
+  public IREdge getEdgeToOptimize(final String taskId) {
+    // Get a stage including the given task
+    final Stage stagePutOnHold = physicalPlan.getStageDAG().getVertices().stream()
+        .filter(stage -> stage.getTaskIds().contains(taskId)).findFirst().get();
+
+    // Get outgoing edges of that stage with MetricCollectionProperty
+    List<StageEdge> stageEdges = physicalPlan.getStageDAG().getOutgoingEdgesOf(stagePutOnHold);
+    IREdge targetEdge = null;
+    for (StageEdge edge : stageEdges) {
+      final IRVertex srcIRVertex = edge.getSrcIRVertex();
+      final IRVertex dstIRVertex = edge.getDstIRVertex();
+      targetEdge = physicalPlan.getIrDAG().getEdgeBetween(srcIRVertex.getId(), dstIRVertex.getId());
+      if (MetricCollectionProperty.Value.DataSkewRuntimePass
+          .equals(targetEdge.getPropertyValue(MetricCollectionProperty.class).get())) {
+        break;
+      }
+    }
+
+    return targetEdge;
+  }
+
   /**
    * Action for after task execution is put on hold.
    * @param executorId       the ID of the executor.
    * @param taskId           the ID of the task.
-   * @param vertexPutOnHold  the ID of vertex that is put on hold.
    */
   private void onTaskExecutionOnHold(final String executorId,
-                                     final String taskId,
-                                     final String vertexPutOnHold) {
+                                     final String taskId) {
     LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
     executorRegistry.updateExecutor(executorId, (executor, state) -> {
       executor.onTaskExecutionComplete(taskId);
@@ -383,21 +405,18 @@ public final class BatchScheduler implements Scheduler {
     final boolean stageComplete =
         planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
 
+    final IREdge targetEdge = getEdgeToOptimize(taskId);
+    if (targetEdge == null) {
+      throw new RuntimeException("No edges specified for data skew optimization");
+    }
+
     if (stageComplete) {
-      // get optimization vertex from the task.
-      final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-          getVertexDagById(taskId).getVertices().stream() // get vertex list
-              .filter(irVertex -> irVertex.getId().equals(vertexPutOnHold)) // find it
-              .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
-              .distinct()
-              .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types
-              .findFirst().orElseThrow(() -> new RuntimeException(TaskState.State.ON_HOLD.name() // get it
-              + " called with failed task ids by some other task than "
-              + MetricCollectionBarrierVertex.class.getSimpleName()));
-      // and we will use this vertex to perform metric collection and dynamic optimization.
-
-      pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
-          new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, taskId, executorId));
+      final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
+          .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
+          .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+      pubSubEventHandlerWrapper.getPubSubEventHandler()
+          .onNext(new DynamicOptimizationEvent(physicalPlan, dynOptDataHandler.getDynOptData(),
+              taskId, executorId, targetEdge));
     }
   }
 
@@ -478,16 +497,10 @@ public final class BatchScheduler implements Scheduler {
         .collect(Collectors.toSet());
   }
 
-  /**
-   * @param taskId id of the task
-   * @return the IR dag
-   */
-  private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
-    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
-      if (stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
-        return stage.getIRDAG();
-      }
-    }
-    throw new RuntimeException("This taskId does not exist in the plan");
+  public void updateDynOptData(final Object dynOptData) {
+    final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
+        .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
+        .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+    dynOptDataHandler.updateDynOptData(dynOptData);
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
index ff3986e..a2c0935 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -51,7 +51,7 @@ public final class NodeShareSchedulingConstraint implements SchedulingConstraint
   @Override
   public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
     final Map<String, Integer> propertyValue = task.getPropertyValue(ResourceSiteProperty.class)
-            .orElseThrow(() -> new RuntimeException("ResourceSiteProperty expected"));
+        .orElseThrow(() -> new RuntimeException("ResourceSiteProperty expected"));
     if (propertyValue.isEmpty()) {
       return true;
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 4a774bf..50dd8d5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -45,8 +45,8 @@ public final class SchedulingConstraintRegistry {
     registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
     registerSchedulingConstraint(freeSlotSchedulingConstraint);
     registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
-    registerSchedulingConstraint(nodeShareSchedulingConstraint);
     registerSchedulingConstraint(skewnessAwareSchedulingConstraint);
+    registerSchedulingConstraint(nodeShareSchedulingConstraint);
   }
 
   /**
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
index 236453f..6e5ba4c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
@@ -16,11 +16,13 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSkewedDataProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -28,6 +30,7 @@ import org.apache.reef.annotations.audience.DriverSide;
 
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
+import java.util.Map;
 
 /**
  * This policy aims to distribute partitions with skewed keys to different executors.
@@ -36,7 +39,6 @@ import javax.inject.Inject;
 @DriverSide
 @AssociatedProperty(ResourceSkewedDataProperty.class)
 public final class SkewnessAwareSchedulingConstraint implements SchedulingConstraint {
-
   @VisibleForTesting
   @Inject
   public SkewnessAwareSchedulingConstraint() {
@@ -45,9 +47,14 @@ public final class SkewnessAwareSchedulingConstraint implements SchedulingConstr
   public boolean hasSkewedData(final Task task) {
     final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId());
     for (StageEdge inEdge : task.getTaskIncomingEdges()) {
-      final KeyRange hashRange = inEdge.getTaskIdxToKeyRange().get(taskIdx);
-      if (((HashRange) hashRange).isSkewed()) {
-        return true;
+      if (CommunicationPatternProperty.Value.Shuffle
+      .equals(inEdge.getDataCommunicationPattern())) {
+        final Map<Integer, KeyRange> taskIdxToKeyRange =
+            inEdge.getPropertyValue(DataSkewMetricProperty.class).get().getMetric();
+        final KeyRange hashRange = taskIdxToKeyRange.get(taskIdx);
+        if (((HashRange) hashRange).isSkewed()) {
+          return true;
+        }
       }
     }
     return false;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
index 6c0222c..e6feaa5 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -131,7 +131,6 @@ final class TaskDispatcher {
           planStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
 
           LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId());
-
           // send the task
           selectedExecutor.onTaskScheduled(task);
         } else {
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
index 87c933f..e3439d6 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
@@ -15,9 +15,16 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -36,27 +43,36 @@ import static org.mockito.Mockito.when;
  * Test cases for {@link SkewnessAwareSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, Task.class, HashRange.class, StageEdge.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Stage.class, HashRange.class,
+IRVertex.class, IREdge.class})
 public final class SkewnessAwareSchedulingConstraintTest {
 
-  private static StageEdge mockStageEdge() {
+  private static StageEdge mockStageEdge(final int numSkewedHashRange,
+                                         final int numTotalHashRange) {
     final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
 
-    final HashRange skewedHashRange1 = mock(HashRange.class);
-    when(skewedHashRange1.isSkewed()).thenReturn(true);
-    final HashRange skewedHashRange2 = mock(HashRange.class);
-    when(skewedHashRange2.isSkewed()).thenReturn(true);
-    final HashRange hashRange = mock(HashRange.class);
-    when(hashRange.isSkewed()).thenReturn(false);
+    for (int taskIdx = 0; taskIdx < numTotalHashRange; taskIdx++) {
+      final HashRange hashRange = mock(HashRange.class);
+      if (taskIdx < numSkewedHashRange) {
+        when(hashRange.isSkewed()).thenReturn(true);
+      } else {
+        when(hashRange.isSkewed()).thenReturn(false);
+      }
+      taskIdxToKeyRange.put(taskIdx, hashRange);
+    }
 
-    taskIdxToKeyRange.put(0, skewedHashRange1);
-    taskIdxToKeyRange.put(1, skewedHashRange2);
-    taskIdxToKeyRange.put(2, hashRange);
+    final IRVertex srcMockVertex = mock(IRVertex.class);
+    final IRVertex dstMockVertex = mock(IRVertex.class);
+    final Stage srcMockStage = mock(Stage.class);
+    final Stage dstMockStage = mock(Stage.class);
 
-    final StageEdge inEdge = mock(StageEdge.class);
-    when(inEdge.getTaskIdxToKeyRange()).thenReturn(taskIdxToKeyRange);
+    final IREdge dummyIREdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, srcMockVertex, dstMockVertex);
+    dummyIREdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+    dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(taskIdxToKeyRange)));
+    final StageEdge dummyEdge = new StageEdge("Edge-0", dummyIREdge.getExecutionProperties(),
+        srcMockVertex, dstMockVertex, srcMockStage, dstMockStage, false);
 
-    return inEdge;
+    return dummyEdge;
   }
 
   private static Task mockTask(final int taskIdx, final List<StageEdge> inEdges) {
@@ -81,11 +97,12 @@ public final class SkewnessAwareSchedulingConstraintTest {
   @Test
   public void testScheduleSkewedTasks() {
     final SchedulingConstraint schedulingConstraint = new SkewnessAwareSchedulingConstraint();
-    final StageEdge inEdge = mockStageEdge();
-    final Task task0 = mockTask(0, Arrays.asList(inEdge));
-    final Task task1 = mockTask(1, Arrays.asList(inEdge));
-    final Task task2 = mockTask(2, Arrays.asList(inEdge));
-    final ExecutorRepresenter e0 = mockExecutorRepresenter(task0);
+    // Create a StageEdge where two out of three are skewed hash ranges.
+    final StageEdge inEdge = mockStageEdge(2, 3);
+    final Task task0 = mockTask(0, Arrays.asList(inEdge));  // skewed task
+    final Task task1 = mockTask(1, Arrays.asList(inEdge));  // skewed task
+    final Task task2 = mockTask(2, Arrays.asList(inEdge));  // non-skewed task
+    final ExecutorRepresenter e0 = mockExecutorRepresenter(task0);  // schedule skewed task to e0
 
     assertEquals(true, schedulingConstraint.testSchedulability(e0, task2));
     assertEquals(false, schedulingConstraint.testSchedulability(e0, task1));
diff --git a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
index 3b3646e..3c23dcf 100644
--- a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
+++ b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
@@ -95,7 +95,7 @@ public final class TestPlanGenerator {
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = policy.runCompileTimeOptimization(irDAG, EMPTY_DAG_DIRECTORY);
     final DAG<Stage, StageEdge> physicalDAG = PLAN_GENERATOR.apply(optimized);
-    return new PhysicalPlan("TestPlan", physicalDAG);
+    return new PhysicalPlan("TestPlan", irDAG, physicalDAG);
   }
 
   /**


Mime
View raw message