nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sa...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-26] Implement SourceLocationAwareSchedulingPolicy (#1)
Date Thu, 08 Mar 2018 07:58:34 GMT
This is an automated email from the ASF dual-hosted git repository.

sanha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d8b9c3  [NEMO-26] Implement SourceLocationAwareSchedulingPolicy (#1)
7d8b9c3 is described below

commit 7d8b9c366db52a28805dd4591d1664115d121d26
Author: JangHo Seo <jangho@jangho.io>
AuthorDate: Thu Mar 8 16:58:32 2018 +0900

    [NEMO-26] Implement SourceLocationAwareSchedulingPolicy (#1)
    
    JIRA: [NEMO-26: Implement SourceLocationAwareSchedulingPolicy](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-26)
    
    **Major changes:**
    
    - Implemented SourceLocationAwareSchedulingPolicy
        - This scheduling policy schedules TaskGroups with Readables to the one of the executors that have the corresponding input split.
        - For TaskGroups without any Readables, it uses RoundRobinSchedulingPolicy.
        - Added unit tests for SourceLocationAwareSchedulingPolicy.
    - Refactor scheduling thread model on runtime master side (thanks to @johnyangk)
        - Made only two threads call scheduling-related code: RuntimeMaster thread (RMT), and SchedulerThread(ST)
        - RMT and ST meet only at two points: SchedulingPolicy, and PendingTaskGroupQueue, which are synchronized(ThreadSafe)
        - Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe)
        - We use ContainerManager for handling containers only, and not executors (to avoid hanging, we also don’t wait for pending resource requests upon termination)
        - ExecutorRegistry and ExecutorRepresenter fully classify running/failed/completed Executor/Taskgroups
    
    **Minor changes to note:**
    - None
    
    **Tests for the changes:**
    
    - Implemented SourceLocationAwareSchedulingPolicy
        - Added SourceLocationAwareSchedulingPolicyTest
    - Refactored container management and scheduling on runtime master side (thanks to @johnyangk)
        - ContainerManagerTest covers the changes
    
    **Other comments:**
    - None
---
 .../main/java/edu/snu/nemo/common/ir/Readable.java |  10 +
 .../common/ir/vertex/InitializedSourceVertex.java  |   7 +-
 compiler/frontend/beam/pom.xml                     |  11 +
 .../beam/source/BeamBoundedSourceVertex.java       |  19 +-
 .../spark/source/SparkBoundedSourceVertex.java     |   5 +
 .../optimizer/examples/EmptyComponents.java        |   4 +
 conf/src/main/java/edu/snu/nemo/conf/JobConf.java  |   2 +-
 examples/resources/sample_executor_resources.json  |   6 +-
 .../common/message/ncs/NcsMessageEnvironment.java  |   1 -
 .../common/plan/physical/BoundedSourceTask.java    |   3 +-
 runtime/common/src/main/proto/ControlMessage.proto |   3 +-
 .../edu/snu/nemo/driver/UserApplicationRunner.java |  17 +-
 .../nemo/runtime/master/BlockManagerMaster.java    |  53 ++-
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java | 175 +++-----
 .../runtime/master/resource/ContainerManager.java  | 198 +++------
 .../master/resource/ExecutorRepresenter.java       |  74 +++-
 .../master/scheduler/BatchSingleJobScheduler.java  | 311 ++++++-------
 .../runtime/master/scheduler/ExecutorRegistry.java | 204 +++++++++
 .../scheduler/RoundRobinSchedulingPolicy.java      | 193 ++++----
 .../nemo/runtime/master/scheduler/Scheduler.java   |   9 +-
 .../runtime/master/scheduler/SchedulerRunner.java  |  13 +-
 .../runtime/master/scheduler/SchedulingPolicy.java |  16 +-
 .../SourceLocationAwareSchedulingPolicy.java       | 242 ++++++++++
 .../snu/nemo/tests/runtime/RuntimeTestUtil.java    | 183 ++------
 .../runtime/executor/TaskGroupExecutorTest.java    |   4 +
 .../executor/datatransfer/DataTransferTest.java    |  18 +-
 .../tests/runtime/master/ContainerManagerTest.java |  42 +-
 .../scheduler/BatchSingleJobSchedulerTest.java     | 176 ++++----
 .../master/scheduler/FaultToleranceTest.java       | 154 +++----
 .../scheduler/RoundRobinSchedulingPolicyTest.java  |  81 ++--
 .../SourceLocationAwareSchedulingPolicyTest.java   | 492 +++++++++++++++++++++
 31 files changed, 1811 insertions(+), 915 deletions(-)

diff --git a/common/src/main/java/edu/snu/nemo/common/ir/Readable.java b/common/src/main/java/edu/snu/nemo/common/ir/Readable.java
index c6a5dfa..44b6b05 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/Readable.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/Readable.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.common.ir;
 
 import java.io.Serializable;
+import java.util.List;
 
 /**
  * Interface for readable.
@@ -28,5 +29,14 @@ public interface Readable<O> extends Serializable {
    * @throws Exception exception while reading data.
    */
   Iterable<O> read() throws Exception;
+
+  /**
+   * Returns the list of locations where this readable resides.
+   * Each location has a complete copy of the readable.
+   * @return List of locations where this readable resides
+   * @throws UnsupportedOperationException when this operation is not supported
+   * @throws Exception any other exceptions on the way
+   */
+  List<String> getLocations() throws Exception;
 }
 
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java
index fce6731..b64d61b 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/InitializedSourceVertex.java
@@ -82,8 +82,13 @@ public final class InitializedSourceVertex<T> extends SourceVertex<T> {
     }
 
     @Override
-    public Iterable<T> read() throws Exception {
+    public Iterable<T> read() {
       return this.initializedSourceData;
     }
+
+    @Override
+    public List<String> getLocations() {
+      throw new UnsupportedOperationException();
+    }
   }
 }
diff --git a/compiler/frontend/beam/pom.xml b/compiler/frontend/beam/pom.xml
index ed1f929..00e69a5 100644
--- a/compiler/frontend/beam/pom.xml
+++ b/compiler/frontend/beam/pom.xml
@@ -51,5 +51,16 @@ limitations under the License.
             <artifactId>beam-sdks-java-core</artifactId>
             <version>${beam.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.beam</groupId>
+            <artifactId>beam-sdks-java-io-hadoop-input-format</artifactId>
+            <version>${beam.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
index 9a6f42f..f03bc57 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/source/BeamBoundedSourceVertex.java
@@ -17,11 +17,15 @@ package edu.snu.nemo.compiler.frontend.beam.source;
 
 import edu.snu.nemo.common.ir.Readable;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.hadoop.inputformat.HadoopInputFormatIO;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,7 +77,7 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
    * BoundedSourceReadable class.
    * @param <T> type.
    */
-  private final class BoundedSourceReadable<T> implements Readable<T> {
+  private static final class BoundedSourceReadable<T> implements Readable<T> {
     private final BoundedSource<T> boundedSource;
 
     /**
@@ -94,5 +98,18 @@ public final class BeamBoundedSourceVertex<O> extends SourceVertex<O> {
       }
       return elements;
     }
+
+    @Override
+    public List<String> getLocations() throws Exception {
+      if (boundedSource instanceof HadoopInputFormatIO.HadoopInputFormatBoundedSource) {
+        final Field inputSplitField = boundedSource.getClass().getDeclaredField("inputSplit");
+        inputSplitField.setAccessible(true);
+        final InputSplit inputSplit = ((HadoopInputFormatIO.SerializableSplit) inputSplitField
+            .get(boundedSource)).getSplit();
+        return Arrays.asList(inputSplit.getLocations());
+      } else {
+        throw new UnsupportedOperationException();
+      }
+    }
   }
 }
diff --git a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkBoundedSourceVertex.java b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkBoundedSourceVertex.java
index e53109b..e1270d9 100644
--- a/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkBoundedSourceVertex.java
+++ b/compiler/frontend/spark/src/main/java/edu/snu/nemo/compiler/frontend/spark/source/SparkBoundedSourceVertex.java
@@ -108,5 +108,10 @@ public final class SparkBoundedSourceVertex<T> extends SourceVertex<T> {
       return () -> JavaConverters.asJavaIteratorConverter(
           rdd.iterator(rdd.getPartitions()[partitionIndex], TaskContext$.MODULE$.empty())).asJava();
     }
+
+    @Override
+    public List<String> getLocations() {
+      throw new UnsupportedOperationException();
+    }
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
index e4d9c65..29000d7 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/examples/EmptyComponents.java
@@ -112,5 +112,9 @@ public class EmptyComponents {
     public Iterable<T> read() {
       return new ArrayList<>();
     }
+    @Override
+    public List<String> getLocations() {
+      throw new UnsupportedOperationException();
+    }
   }
 }
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 521990e..c539d7a 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -144,7 +144,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
   /**
    * Scheduler timeout in ms.
    */
-  @NamedParameter(doc = "Scheduler timeout in ms", short_name = "scheduler_timeout_ms", default_value = "10000")
+  @NamedParameter(doc = "Scheduler timeout in ms", short_name = "scheduler_timeout_ms", default_value = "50")
   public final class SchedulerTimeoutMs implements Name<Integer> {
   }
 
diff --git a/examples/resources/sample_executor_resources.json b/examples/resources/sample_executor_resources.json
index f4d8bc0..5f5b2e0 100644
--- a/examples/resources/sample_executor_resources.json
+++ b/examples/resources/sample_executor_resources.json
@@ -2,16 +2,16 @@
   {
     "type": "Transient",
     "memory_mb": 512,
-    "capacity": 6
+    "capacity": 1
   },
   {
     "type": "Reserved",
     "memory_mb": 512,
-    "capacity": 6
+    "capacity": 1
   },
   {
     "type": "Compute",
     "memory_mb": 512,
-    "capacity": 6
+    "capacity": 1
   }
 ]
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 70a6928..3d37a60 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -202,7 +202,6 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
       case BlockStateChanged:
       case ExecutorFailed:
       case DataSizeMetric:
-      case ContainerFailed:
       case MetricMessageReceived:
         return MessageType.Send;
       case RequestBlockLocation:
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java
index 9de7e03..94e1332 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/BoundedSourceTask.java
@@ -44,9 +44,8 @@ public final class BoundedSourceTask<O> extends Task {
 
   /**
    * @return the readable of source data.
-   * @throws Exception if fail to get.
    */
-  public Readable<O> getReadable() throws Exception {
+  public Readable<O> getReadable() {
     return readable;
   }
 }
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index f182d0b..7fa24e5 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -27,8 +27,7 @@ enum MessageType {
     RequestBlockLocation = 4;
     BlockLocationInfo = 5;
     ExecutorFailed = 6;
-    ContainerFailed = 7;
-    MetricMessageReceived = 8;
+    MetricMessageReceived = 7;
 }
 
 message Message {
diff --git a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
index 356a5f5..78896df 100644
--- a/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
+++ b/runtime/driver/src/main/java/edu/snu/nemo/driver/UserApplicationRunner.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.driver;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
@@ -26,6 +27,7 @@ import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.compiler.optimizer.policy.Policy;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.tang.Injector;
@@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.util.Base64;
+import java.util.concurrent.ScheduledExecutorService;
 
 /**
  * Compiles and runs User application.
@@ -98,7 +101,19 @@ public final class UserApplicationRunner implements Runnable {
       final PhysicalPlan physicalPlan = backend.compile(optimizedDAG);
 
       physicalPlan.getStageDAG().storeJSON(dagDirectory, "plan", "physical execution plan by compiler");
-      runtimeMaster.execute(physicalPlan, maxScheduleAttempt);
+
+      // Execute!
+      final Pair<JobStateManager, ScheduledExecutorService> executionResult =
+          runtimeMaster.execute(physicalPlan, maxScheduleAttempt);
+
+      // Wait for the job to finish and stop logging
+      final JobStateManager jobStateManager = executionResult.left();
+      final ScheduledExecutorService dagLoggingExecutor = executionResult.right();
+      jobStateManager.waitUntilFinish();
+      dagLoggingExecutor.shutdown();
+
+      jobStateManager.storeJSON(dagDirectory, "final");
+      LOG.info("{} is complete!", physicalPlan.getId());
       runtimeMaster.terminate();
     } catch (final Exception e) {
       throw new RuntimeException(e);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index f28d893..4dc2e4c 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.runtime.master;
 
 import edu.snu.nemo.common.exception.IllegalMessageException;
+import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.runtime.common.exception.AbsentBlockException;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
@@ -42,7 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static edu.snu.nemo.runtime.common.state.BlockState.State.SCHEDULED;
-import static edu.snu.nemo.runtime.master.RuntimeMaster.convertBlockState;
 
 /**
  * Master-side block manager.
@@ -331,7 +331,7 @@ public final class BlockManagerMaster {
             final ControlMessage.BlockStateChangedMsg blockStateChangedMsg =
                 message.getBlockStateChangedMsg();
             final String blockId = blockStateChangedMsg.getBlockId();
-            onBlockStateChanged(blockId, RuntimeMaster.convertBlockState(blockStateChangedMsg.getState()),
+            onBlockStateChanged(blockId, convertBlockState(blockStateChangedMsg.getState()),
                 blockStateChangedMsg.getLocation());
             break;
           default:
@@ -435,4 +435,53 @@ public final class BlockManagerMaster {
       return locationFuture;
     }
   }
+
+  /**
+   * Return the corresponding {@link BlockState.State} for the specified {@link ControlMessage.BlockStateFromExecutor}.
+   * @param state {@link ControlMessage.BlockStateFromExecutor}
+   * @return the corresponding {@link BlockState.State}
+   */
+  public static BlockState.State convertBlockState(final ControlMessage.BlockStateFromExecutor state) {
+    switch (state) {
+      case BLOCK_READY:
+        return BlockState.State.READY;
+      case SCHEDULED:
+        return BlockState.State.SCHEDULED;
+      case COMMITTED:
+        return BlockState.State.COMMITTED;
+      case LOST_BEFORE_COMMIT:
+        return BlockState.State.LOST_BEFORE_COMMIT;
+      case LOST:
+        return BlockState.State.LOST;
+      case REMOVED:
+        return BlockState.State.REMOVED;
+      default:
+        throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state));
+    }
+  }
+
+  /**
+   * Return the corresponding {@link ControlMessage.BlockStateFromExecutor} for the specified {@link BlockState.State}.
+   * @param state {@link BlockState.State}
+   * @return the corresponding {@link ControlMessage.BlockStateFromExecutor}
+   */
+  public static ControlMessage.BlockStateFromExecutor convertBlockState(final BlockState.State state) {
+    switch (state) {
+      case READY:
+        return ControlMessage.BlockStateFromExecutor.BLOCK_READY;
+      case SCHEDULED:
+        return ControlMessage.BlockStateFromExecutor.SCHEDULED;
+      case COMMITTED:
+        return ControlMessage.BlockStateFromExecutor.COMMITTED;
+      case LOST_BEFORE_COMMIT:
+        return ControlMessage.BlockStateFromExecutor.LOST_BEFORE_COMMIT;
+      case LOST:
+        return ControlMessage.BlockStateFromExecutor.LOST;
+      case REMOVED:
+        return ControlMessage.BlockStateFromExecutor.REMOVED;
+      default:
+        throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state));
+    }
+  }
+
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index be3a059..532ebd6 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -15,6 +15,7 @@
  */
 package edu.snu.nemo.runtime.master;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
@@ -24,13 +25,11 @@ import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
-import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
-import edu.snu.nemo.runtime.master.scheduler.SchedulerRunner;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -42,24 +41,23 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.TreeNode;
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import javax.inject.Inject;
-import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
 
 import static edu.snu.nemo.runtime.common.state.TaskGroupState.State.COMPLETE;
 import static edu.snu.nemo.runtime.common.state.TaskGroupState.State.ON_HOLD;
 
 /**
+ * (WARNING) Use runtimeMasterThread for all public methods to avoid race conditions.
+ *
  * Runtime Master is the central controller of Runtime.
  * Compiler submits an {@link PhysicalPlan} to Runtime Master to execute a job.
  * Runtime Master handles:
- *    a) Scheduling the job with {@link Scheduler}, {@link SchedulerRunner}, {@link PendingTaskGroupQueue}.
+ *    a) Scheduling the job with {@link Scheduler}.
  *    b) Managing resources with {@link ContainerManager}.
  *    c) Managing blocks with {@link BlockManagerMaster}.
  *    d) Receiving and sending control messages with {@link MessageEnvironment}.
@@ -70,11 +68,9 @@ public final class RuntimeMaster {
   private static final Logger LOG = LoggerFactory.getLogger(RuntimeMaster.class.getName());
   private static final int DAG_LOGGING_PERIOD = 3000;
 
-  private final ExecutorService masterControlEventExecutor;
+  private final ExecutorService runtimeMasterThread;
 
   private final Scheduler scheduler;
-  private final SchedulerRunner schedulerRunner;
-  private final PendingTaskGroupQueue pendingTaskGroupQueue;
   private final ContainerManager containerManager;
   private final BlockManagerMaster blockManagerMaster;
   private final MetricMessageHandler metricMessageHandler;
@@ -91,21 +87,17 @@ public final class RuntimeMaster {
 
   @Inject
   public RuntimeMaster(final Scheduler scheduler,
-                       final SchedulerRunner schedulerRunner,
-                       final PendingTaskGroupQueue pendingTaskGroupQueue,
                        final ContainerManager containerManager,
                        final BlockManagerMaster blockManagerMaster,
                        final MetricMessageHandler metricMessageHandler,
                        final MessageEnvironment masterMessageEnvironment,
                        @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
-    // We would like to keep the master event thread pool single threaded
+    // We would like to use a single thread for runtime master operations
     // since the processing logic in master takes a very short amount of time
     // compared to the job completion times of executed jobs
     // and keeping it single threaded removes the complexity of multi-thread synchronization.
-    this.masterControlEventExecutor = Executors.newSingleThreadExecutor();
+    this.runtimeMasterThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "RuntimeMaster"));
     this.scheduler = scheduler;
-    this.schedulerRunner = schedulerRunner;
-    this.pendingTaskGroupQueue = pendingTaskGroupQueue;
     this.containerManager = containerManager;
     this.blockManagerMaster = blockManagerMaster;
     this.metricMessageHandler = metricMessageHandler;
@@ -123,53 +115,47 @@ public final class RuntimeMaster {
    * @param plan to execute.
    * @param maxScheduleAttempt the max number of times this plan/sub-part of the plan should be attempted.
    */
-  public void execute(final PhysicalPlan plan,
-                      final int maxScheduleAttempt) {
-    this.irVertices.addAll(plan.getTaskIRVertexMap().values());
-    try {
-      final JobStateManager jobStateManager =
-          new JobStateManager(plan, blockManagerMaster, metricMessageHandler, maxScheduleAttempt);
-
-      scheduler.scheduleJob(plan, jobStateManager);
-
-      // Schedule dag logging thread
-      final ScheduledExecutorService dagLoggingExecutor = scheduleDagLogging(jobStateManager);
-
-      // Wait for the job to finish and stop logging
-      jobStateManager.waitUntilFinish();
-      dagLoggingExecutor.shutdown();
+  public Pair<JobStateManager, ScheduledExecutorService> execute(final PhysicalPlan plan,
+                                                                 final int maxScheduleAttempt) {
+    final Callable<Pair<JobStateManager, ScheduledExecutorService>> jobExecutionCallable = () -> {
+      this.irVertices.addAll(plan.getTaskIRVertexMap().values());
+      try {
+        final JobStateManager jobStateManager =
+            new JobStateManager(plan, blockManagerMaster, metricMessageHandler, maxScheduleAttempt);
+        scheduler.scheduleJob(plan, jobStateManager);
+        final ScheduledExecutorService dagLoggingExecutor = scheduleDagLogging(jobStateManager);
+        return Pair.of(jobStateManager, dagLoggingExecutor);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    };
 
-      jobStateManager.storeJSON(dagDirectory, "final");
-      LOG.info("{} is complete!", plan.getId());
+    try {
+      return runtimeMasterThread.submit(jobExecutionCallable).get();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
   public void terminate() {
-    try {
-      masterControlEventExecutor.shutdown();
+    runtimeMasterThread.execute(() -> {
 
       scheduler.terminate();
-      schedulerRunner.terminate();
-      pendingTaskGroupQueue.close();
-      masterMessageEnvironment.close();
+      try {
+        masterMessageEnvironment.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
       metricMessageHandler.terminate();
+      containerManager.terminate();
 
-      final Future<Boolean> allExecutorsClosed = containerManager.terminate();
-      if (allExecutorsClosed.get()) {
-        LOG.info("All executors were closed successfully!");
-      } else {
-        LOG.error("Failed to shutdown all executors. See log exceptions for details. Terminating RuntimeMaster.");
-      }
-    } catch (Exception e) {
-      new ContainerException(new Throwable("An exception occurred while trying to terminate ContainerManager"));
-      e.printStackTrace();
-    }
+    });
+
+    // Do not shutdown runtimeMasterThread. We need it to clean things up.
   }
 
   public void requestContainer(final String resourceSpecificationString) {
-    final Future<?> containerRequestEventResult = masterControlEventExecutor.submit(() -> {
+    final Future<?> containerRequestEventResult = runtimeMasterThread.submit(() -> {
       try {
         final TreeNode jsonRootNode = objectMapper.readTree(resourceSpecificationString);
 
@@ -205,8 +191,11 @@ public final class RuntimeMaster {
   public void onContainerAllocated(final String executorId,
                                    final AllocatedEvaluator allocatedEvaluator,
                                    final Configuration executorConfiguration) {
-    masterControlEventExecutor.execute(() ->
-      containerManager.onContainerAllocated(executorId, allocatedEvaluator, executorConfiguration));
+    runtimeMasterThread.execute(() -> {
+
+      containerManager.onContainerAllocated(executorId, allocatedEvaluator, executorConfiguration);
+
+    });
   }
 
   /**
@@ -216,14 +205,18 @@ public final class RuntimeMaster {
    */
   public boolean onExecutorLaunched(final ActiveContext activeContext) {
     final Callable<Boolean> processExecutorLaunchedEvent = () -> {
-      containerManager.onExecutorLaunched(activeContext);
-      scheduler.onExecutorAdded(activeContext.getId());
-      return (resourceRequestCount.decrementAndGet() == 0);
+      final Optional<ExecutorRepresenter> executor = containerManager.onContainerLaunched(activeContext);
+      if (executor.isPresent()) {
+        scheduler.onExecutorAdded(executor.get());
+        return (resourceRequestCount.decrementAndGet() == 0);
+      } else {
+        return false;
+      }
     };
 
     final boolean eventResult;
     try {
-      eventResult = masterControlEventExecutor.submit(processExecutorLaunchedEvent).get();
+      eventResult = runtimeMasterThread.submit(processExecutorLaunchedEvent).get();
     } catch (final Exception e) {
       throw new ContainerException(e);
     }
@@ -235,8 +228,8 @@ public final class RuntimeMaster {
    * @param failedExecutorId of the failed executor.
    */
   public void onExecutorFailed(final String failedExecutorId) {
-    masterControlEventExecutor.execute(() -> {
-      containerManager.onExecutorRemoved(failedExecutorId);
+    runtimeMasterThread.execute(() -> {
+      LOG.error(failedExecutorId + " executor failed");
       scheduler.onExecutorRemoved(failedExecutorId);
     });
   }
@@ -247,7 +240,11 @@ public final class RuntimeMaster {
   public final class MasterControlMessageReceiver implements MessageListener<ControlMessage.Message> {
     @Override
     public void onMessage(final ControlMessage.Message message) {
-      masterControlEventExecutor.execute(() -> handleControlMessage(message));
+      runtimeMasterThread.execute(() -> {
+
+        handleControlMessage(message);
+
+      });
     }
 
     @Override
@@ -274,16 +271,12 @@ public final class RuntimeMaster {
           convertFailureCause(taskGroupStateChangedMsg.getFailureCause()));
       break;
     case ExecutorFailed:
+      // Executor failed due to user code.
       final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
       final String failedExecutorId = executorFailedMsg.getExecutorId();
       final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
       LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
-      containerManager.onExecutorRemoved(failedExecutorId);
       throw new RuntimeException(exception);
-    case ContainerFailed:
-      final ControlMessage.ContainerFailedMsg containerFailedMsg = message.getContainerFailedMsg();
-      LOG.error(containerFailedMsg.getExecutorId() + " failed");
-      break;
     case DataSizeMetric:
       final ControlMessage.DataSizeMetricMsg dataSizeMetricMsg = message.getDataSizeMetricMsg();
       // TODO #511: Refactor metric aggregation for (general) run-rime optimization.
@@ -311,7 +304,7 @@ public final class RuntimeMaster {
    * @param srcVertexId   the ID of the source vertex.
    * @param blockId       the ID of the block.
    */
-  public void accumulateBarrierMetric(final List<Long> blockSizeInfo,
+  private void accumulateBarrierMetric(final List<Long> blockSizeInfo,
                                       final String srcVertexId,
                                       final String blockId) {
     final IRVertex vertexToSendMetricDataTo = irVertices.stream()
@@ -347,44 +340,6 @@ public final class RuntimeMaster {
     }
   }
 
-  public static BlockState.State convertBlockState(final ControlMessage.BlockStateFromExecutor state) {
-    switch (state) {
-    case BLOCK_READY:
-      return BlockState.State.READY;
-    case SCHEDULED:
-      return BlockState.State.SCHEDULED;
-    case COMMITTED:
-      return BlockState.State.COMMITTED;
-    case LOST_BEFORE_COMMIT:
-      return BlockState.State.LOST_BEFORE_COMMIT;
-    case LOST:
-      return BlockState.State.LOST;
-    case REMOVED:
-      return BlockState.State.REMOVED;
-    default:
-      throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state));
-    }
-  }
-
-  public static ControlMessage.BlockStateFromExecutor convertBlockState(final BlockState.State state) {
-    switch (state) {
-      case READY:
-        return ControlMessage.BlockStateFromExecutor.BLOCK_READY;
-      case SCHEDULED:
-        return ControlMessage.BlockStateFromExecutor.SCHEDULED;
-      case COMMITTED:
-        return ControlMessage.BlockStateFromExecutor.COMMITTED;
-      case LOST_BEFORE_COMMIT:
-        return ControlMessage.BlockStateFromExecutor.LOST_BEFORE_COMMIT;
-      case LOST:
-        return ControlMessage.BlockStateFromExecutor.LOST;
-      case REMOVED:
-        return ControlMessage.BlockStateFromExecutor.REMOVED;
-      default:
-        throw new UnknownExecutionStateException(new Exception("This BlockState is unknown: " + state));
-    }
-  }
-
   private TaskGroupState.RecoverableFailureCause convertFailureCause(
       final ControlMessage.RecoverableFailureCause cause) {
     switch (cause) {
@@ -417,20 +372,4 @@ public final class RuntimeMaster {
 
     return dagLoggingExecutor;
   }
-
-  /**
-   * Map function that converts a json string to a java map object.
-   */
-  final class JsonStringToMapFunction implements Function<String, Map<String, Object>> {
-    @Override
-    public Map<String, Object> apply(final String s) {
-      try {
-        return objectMapper.readValue(s, new TypeReference<Map<String, String>>() { });
-      } catch (final IOException e) {
-        e.printStackTrace();
-        throw new IllegalMessageException(
-            new Exception("The metric message format is incorrect. It should be in Json format: " + s));
-      }
-    }
-  }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
index 03f3b2e..7d3d917 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ContainerManager.java
@@ -17,11 +17,8 @@ package edu.snu.nemo.runtime.master.resource;
 
 import edu.snu.nemo.common.exception.ContainerException;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
@@ -29,28 +26,35 @@ import org.apache.reef.driver.evaluator.EvaluatorRequest;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.tang.Configuration;
 
+import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 
 import org.apache.reef.tang.annotations.Parameter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Encapsulates REEF's evaluator management for executors.
- * Serves as a single point of container/executor management in Runtime.
+ * (WARNING) This class is not thread-safe.
+ * Only a single thread should use the methods of this class.
+ * (i.e., runtimeMasterThread in RuntimeMaster)
+ *
+ * Encapsulates REEF's evaluator management for containers.
+ * Serves as a single point of container management in Runtime.
  * We define a unit of resource a container (an evaluator in REEF), and launch a single executor on each container.
  */
 // TODO #60: Specify Types in Requesting Containers
 // We need an overall cleanup of this class after #60 is resolved.
 @DriverSide
+@NotThreadSafe
 public final class ContainerManager {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerManager.class.getName());
 
+  private boolean isTerminated;
+
   private final EvaluatorRequestor evaluatorRequestor;
   private final MessageEnvironment messageEnvironment;
   private final ExecutorService serializationExecutorService; // Executor service for scheduling message serialization.
@@ -61,38 +65,18 @@ public final class ContainerManager {
   private final Map<String, CountDownLatch> requestLatchByResourceSpecId;
 
   /**
-   * A map containing a list of executor representations for each container type.
-   */
-  private final Map<String, List<ExecutorRepresenter>> executorsByContainerType;
-
-  /**
-   * A map of executor ID to the corresponding {@link ExecutorRepresenter}.
-   */
-  private final Map<String, ExecutorRepresenter> executorRepresenterMap;
-
-  /**
-   * A map of failed executor ID to the corresponding failed {@link ExecutorRepresenter}.
-   */
-  private final Map<String, ExecutorRepresenter> failedExecutorRepresenterMap;
-
-  /**
    * Keeps track of evaluator and context requests.
    */
   private final Map<String, ResourceSpecification> pendingContextIdToResourceSpec;
   private final Map<String, List<ResourceSpecification>> pendingContainerRequestsByContainerType;
 
-  private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
-
   @Inject
-  public ContainerManager(@Parameter(JobConf.ScheduleSerThread.class) final int scheduleSerThread,
-                          final EvaluatorRequestor evaluatorRequestor,
-                          final MessageEnvironment messageEnvironment) {
+  private ContainerManager(@Parameter(JobConf.ScheduleSerThread.class) final int scheduleSerThread,
+                           final EvaluatorRequestor evaluatorRequestor,
+                           final MessageEnvironment messageEnvironment) {
+    this.isTerminated = false;
     this.evaluatorRequestor = evaluatorRequestor;
     this.messageEnvironment = messageEnvironment;
-    this.persistentConnectionToMasterMap = new PersistentConnectionToMasterMap(messageEnvironment);
-    this.executorsByContainerType = new HashMap<>();
-    this.executorRepresenterMap = new HashMap<>();
-    this.failedExecutorRepresenterMap = new HashMap<>();
     this.pendingContextIdToResourceSpec = new HashMap<>();
     this.pendingContainerRequestsByContainerType = new HashMap<>();
     this.requestLatchByResourceSpecId = new HashMap<>();
@@ -104,15 +88,18 @@ public final class ContainerManager {
    * @param numToRequest number of containers to request
    * @param resourceSpecification containing the specifications of
    */
-  public synchronized void requestContainer(final int numToRequest,
-                                            final ResourceSpecification resourceSpecification) {
+  public void requestContainer(final int numToRequest, final ResourceSpecification resourceSpecification) {
+    if (isTerminated) {
+      LOG.info("ContainerManager is terminated, ignoring {}", resourceSpecification.toString());
+      return;
+    }
+
     if (numToRequest > 0) {
       // Create a list of executor specifications to be used when containers are allocated.
       final List<ResourceSpecification> resourceSpecificationList = new ArrayList<>(numToRequest);
       for (int i = 0; i < numToRequest; i++) {
         resourceSpecificationList.add(resourceSpecification);
       }
-      executorsByContainerType.putIfAbsent(resourceSpecification.getContainerType(), new ArrayList<>(numToRequest));
 
       // Mark the request as pending with the given specifications.
       pendingContainerRequestsByContainerType.putIfAbsent(resourceSpecification.getContainerType(), new ArrayList<>());
@@ -139,25 +126,15 @@ public final class ContainerManager {
    * @param allocatedContainer the allocated container.
    * @param executorConfiguration executor related configuration.
    */
-  public synchronized void onContainerAllocated(final String executorId,
-                                                final AllocatedEvaluator allocatedContainer,
-                                                final Configuration executorConfiguration) {
-    onContainerAllocated(selectResourceSpecForContainer(), executorId,
-        allocatedContainer, executorConfiguration);
-  }
+  public void onContainerAllocated(final String executorId, final AllocatedEvaluator allocatedContainer,
+                                   final Configuration executorConfiguration) {
+    if (isTerminated) {
+      LOG.info("ContainerManager is terminated, closing {}", allocatedContainer.getId());
+      allocatedContainer.close();
+      return;
+    }
 
-  // To be exposed as a public synchronized method in place of the above "onContainerAllocated"
-  /**
-   * Launches executor once a container is allocated.
-   * @param resourceSpecification of the executor to be launched.
-   * @param executorId of the executor to be launched.
-   * @param allocatedContainer the allocated container.
-   * @param executorConfiguration executor related configuration.
-   */
-  private void onContainerAllocated(final ResourceSpecification resourceSpecification,
-                                    final String executorId,
-                                    final AllocatedEvaluator allocatedContainer,
-                                    final Configuration executorConfiguration) {
+    final ResourceSpecification resourceSpecification = selectResourceSpecForContainer();
     LOG.info("Container type (" + resourceSpecification.getContainerType()
         + ") allocated, will be used for [" + executorId + "]");
     pendingContextIdToResourceSpec.put(executorId, resourceSpecification);
@@ -166,37 +143,22 @@ public final class ContainerManager {
   }
 
   /**
-   * Selects an executor specification for the executor to be launched on a container.
-   * Important! This is a "hack" to get around the inability to mark evaluators with Node Labels in REEF.
-   * @return the selected executor specification.
+   * Initializes master's connection to the container once launched.
+   * A representation of the executor to reside in master is created.
+   *
+   * @param activeContext for the launched container.
+   * @return a representation of the executor. (return an empty Optional if terminated)
    */
-  private ResourceSpecification selectResourceSpecForContainer() {
-    ResourceSpecification selectedResourceSpec = null;
-    for (final Map.Entry<String, List<ResourceSpecification>> entry
-        : pendingContainerRequestsByContainerType.entrySet()) {
-      if (entry.getValue().size() > 0) {
-        selectedResourceSpec = entry.getValue().remove(0);
-        break;
-      }
+  public Optional<ExecutorRepresenter> onContainerLaunched(final ActiveContext activeContext) {
+    if (isTerminated) {
+      LOG.info("ContainerManager is terminated, closing {}", activeContext.getId());
+      activeContext.close();
+      return Optional.empty();
     }
 
-    if (selectedResourceSpec != null) {
-      return selectedResourceSpec;
-    }
-    throw new ContainerException(new Throwable("We never requested for an extra container"));
-  }
-
-  /**
-   * Initializes master's connection to the executor once launched.
-   * A representation of the executor to reside in master is created.
-   * @param activeContext for the launched executor.
-   */
-  public synchronized void onExecutorLaunched(final ActiveContext activeContext) {
     // We set contextId = executorId in NemoDriver when we generate executor configuration.
     final String executorId = activeContext.getId();
 
-    LOG.info("[" + executorId + "] is up and running");
-
     final ResourceSpecification resourceSpec = pendingContextIdToResourceSpec.remove(executorId);
 
     // Connect to the executor and initiate Master side's executor representation.
@@ -210,74 +172,40 @@ public final class ContainerManager {
 
     // Create the executor representation.
     final ExecutorRepresenter executorRepresenter =
-        new ExecutorRepresenter(executorId, resourceSpec, messageSender, activeContext, serializationExecutorService);
+        new ExecutorRepresenter(executorId, resourceSpec, messageSender, activeContext, serializationExecutorService,
+            activeContext.getEvaluatorDescriptor().getNodeDescriptor().getName());
 
-    executorsByContainerType.putIfAbsent(resourceSpec.getContainerType(), new ArrayList<>());
-    executorsByContainerType.get(resourceSpec.getContainerType()).add(executorRepresenter);
-    executorRepresenterMap.put(executorId, executorRepresenter);
+    LOG.info("{} is up and running at {}", executorId, executorRepresenter.getNodeName());
 
     requestLatchByResourceSpecId.get(resourceSpec.getResourceSpecId()).countDown();
+    return Optional.of(executorRepresenter);
   }
 
-  public synchronized void onExecutorRemoved(final String failedExecutorId) {
-    LOG.info("[" + failedExecutorId + "] failure reported.");
-
-    final ExecutorRepresenter failedExecutor = executorRepresenterMap.remove(failedExecutorId);
-    failedExecutor.onExecutorFailed();
-
-    executorsByContainerType.get(failedExecutor.getContainerType()).remove(failedExecutor);
-
-    failedExecutorRepresenterMap.put(failedExecutorId, failedExecutor);
-
-    // Signal RuntimeMaster on CONTAINER_FAILURE type FAILED_RECOVERABLE state
-    persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
-        ControlMessage.Message.newBuilder()
-            .setId(RuntimeIdGenerator.generateMessageId())
-            .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-            .setType(ControlMessage.MessageType.ContainerFailed)
-            .setContainerFailedMsg(ControlMessage.ContainerFailedMsg.newBuilder()
-                .setExecutorId(failedExecutorId)
-                .build())
-            .build());
-  }
-
-  public synchronized Map<String, ExecutorRepresenter> getExecutorRepresenterMap() {
-    return executorRepresenterMap;
-  }
-
-  public synchronized Map<String, ExecutorRepresenter> getFailedExecutorRepresenterMap() {
-    return failedExecutorRepresenterMap;
-  }
-
-  /**
-   * Shuts down the running executors.
-   */
-  private void shutdownRunningExecutors() {
-    executorRepresenterMap.entrySet().forEach(e -> e.getValue().shutDown());
-    executorRepresenterMap.clear();
+  public void terminate() {
+    if (isTerminated) {
+      throw new IllegalStateException("Cannot terminate twice");
+    }
+    isTerminated = true;
   }
 
   /**
-   * Terminates ContainerManager.
-   * Before we terminate, we must wait for all the executors we requested
-   * and shutdown all of them if any of them is running.
-   * @return a future that returns a boolean on whether all requested resources were allocated and released.
+   * Selects an executor specification for the executor to be launched on a container.
+   * Important! This is a "hack" to get around the inability to mark evaluators with Node Labels in REEF.
+   * @return the selected executor specification.
    */
-  public synchronized Future<Boolean> terminate() {
-    shutdownRunningExecutors();
-    return Executors.newSingleThreadExecutor().submit(() -> waitForAllRequestedResources());
-  }
-
-  private boolean waitForAllRequestedResources() {
-    requestLatchByResourceSpecId.forEach((resourceSpecId, latchForRequest) -> {
-      try {
-        latchForRequest.await();
-      } catch (InterruptedException e) {
-        throw new ContainerException(e);
+  private ResourceSpecification selectResourceSpecForContainer() {
+    ResourceSpecification selectedResourceSpec = null;
+    for (final Map.Entry<String, List<ResourceSpecification>> entry
+        : pendingContainerRequestsByContainerType.entrySet()) {
+      if (entry.getValue().size() > 0) {
+        selectedResourceSpec = entry.getValue().remove(0);
+        break;
       }
-    });
-    shutdownRunningExecutors();
-    requestLatchByResourceSpecId.clear();
-    return executorRepresenterMap.isEmpty();
+    }
+
+    if (selectedResourceSpec != null) {
+      return selectedResourceSpec;
+    }
+    throw new ContainerException(new Throwable("We never requested for an extra container"));
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index 2bf3915..c76e64a 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -24,18 +24,23 @@ import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
 
+import javax.annotation.concurrent.NotThreadSafe;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 /**
+ * (WARNING) This class is not thread-safe.
+ *
  * Contains information/state regarding an executor.
  * Such information may include:
  *    a) The executor's resource type.
  *    b) The executor's capacity (ex. number of cores).
  *    c) Task groups scheduled/launched for the executor.
- *    d) (Please add other information as we implement more features).
+ *    d) Name of the physical node which hosts this executor.
+ *    e) (Please add other information as we implement more features).
  */
+@NotThreadSafe
 public final class ExecutorRepresenter {
 
   private final String executorId;
@@ -46,12 +51,23 @@ public final class ExecutorRepresenter {
   private final MessageSender<ControlMessage.Message> messageSender;
   private final ActiveContext activeContext;
   private final ExecutorService serializationExecutorService;
-
+  private final String nodeName;
+
+  /**
+   * Creates a reference to the specified executor.
+   * @param executorId the executor id
+   * @param resourceSpecification specification for the executor
+   * @param messageSender provides communication context for this executor
+   * @param activeContext context on the corresponding REEF evaluator
+   * @param serializationExecutorService provides threads for message serialization
+   * @param nodeName physical name of the node where this executor resides
+   */
   public ExecutorRepresenter(final String executorId,
                              final ResourceSpecification resourceSpecification,
                              final MessageSender<ControlMessage.Message> messageSender,
                              final ActiveContext activeContext,
-                             final ExecutorService serializationExecutorService) {
+                             final ExecutorService serializationExecutorService,
+                             final String nodeName) {
     this.executorId = executorId;
     this.resourceSpecification = resourceSpecification;
     this.messageSender = messageSender;
@@ -60,13 +76,21 @@ public final class ExecutorRepresenter {
     this.failedTaskGroups = new HashSet<>();
     this.activeContext = activeContext;
     this.serializationExecutorService = serializationExecutorService;
+    this.nodeName = nodeName;
   }
 
+  /**
+   * Marks all TaskGroups which were running in this executor as failed.
+   */
   public void onExecutorFailed() {
     runningTaskGroups.forEach(taskGroupId -> failedTaskGroups.add(taskGroupId));
     runningTaskGroups.clear();
   }
 
+  /**
+   * Marks the TaskGroup as running, and sends scheduling message to the executor.
+   * @param scheduledTaskGroup
+   */
   public void onTaskGroupScheduled(final ScheduledTaskGroup scheduledTaskGroup) {
     runningTaskGroups.add(scheduledTaskGroup.getTaskGroupId());
     failedTaskGroups.remove(scheduledTaskGroup.getTaskGroupId());
@@ -89,40 +113,84 @@ public final class ExecutorRepresenter {
     });
   }
 
+  /**
+   * Sends control message to the executor.
+   * @param message Message object to send
+   */
   public void sendControlMessage(final ControlMessage.Message message) {
     messageSender.send(message);
   }
 
+  /**
+   * Marks the specified TaskGroup as completed.
+   * @param taskGroupId id of the TaskGroup
+   */
   public void onTaskGroupExecutionComplete(final String taskGroupId) {
     runningTaskGroups.remove(taskGroupId);
     completeTaskGroups.add(taskGroupId);
   }
 
+  /**
+   * Marks the specified TaskGroup as failed.
+   * @param taskGroupId id of the TaskGroup
+   */
   public void onTaskGroupExecutionFailed(final String taskGroupId) {
     runningTaskGroups.remove(taskGroupId);
     failedTaskGroups.add(taskGroupId);
   }
 
+  /**
+   * @return how many TaskGroups can this executor simultaneously run
+   */
   public int getExecutorCapacity() {
     return resourceSpecification.getCapacity();
   }
 
+  /**
+   * @return set of ids of TaskGroups that are running in this executor
+   */
   public Set<String> getRunningTaskGroups() {
     return runningTaskGroups;
   }
 
+  /**
+   * @return set of ids of TaskGroups that have been failed in this exeuctor
+   */
+  public Set<String> getFailedTaskGroups() {
+    return failedTaskGroups;
+  }
+
+  /**
+   * @return set of ids of TaskGroups that have been completed in this executor
+   */
   public Set<String> getCompleteTaskGroups() {
     return completeTaskGroups;
   }
 
+  /**
+   * @return the executor id
+   */
   public String getExecutorId() {
     return executorId;
   }
 
+  /**
+   * @return the container type
+   */
   public String getContainerType() {
     return resourceSpecification.getContainerType();
   }
 
+  /**
+   * @return physical name of the node where this executor resides
+   */
+  public String getNodeName() {
+    return nodeName;
+  }
+
+  /**
+   * Shuts down this executor.
+   */
   public void shutDown() {
     activeContext.close();
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index 9c22b4d..f12293f 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -30,10 +30,12 @@ import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
 import java.util.stream.Collectors;
@@ -43,10 +45,14 @@ import org.slf4j.Logger;
 import static edu.snu.nemo.runtime.common.state.TaskGroupState.State.ON_HOLD;
 
 /**
+ * (WARNING) Only a single dedicated thread should use the public methods of this class.
+ * (i.e., runtimeMasterThread in RuntimeMaster)
+ *
  * BatchSingleJobScheduler receives a single {@link PhysicalPlan} to execute and schedules the TaskGroups.
  * The policy by which it schedules them is dependent on the implementation of {@link SchedulingPolicy}.
  */
 @DriverSide
+@NotThreadSafe
 public final class BatchSingleJobScheduler implements Scheduler {
   private static final Logger LOG = LoggerFactory.getLogger(BatchSingleJobScheduler.class.getName());
   private static final int SCHEDULE_ATTEMPT_ON_CONTAINER_FAILURE = Integer.MAX_VALUE;
@@ -96,8 +102,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * @param scheduledJobStateManager to keep track of the submitted job's states.
    */
   @Override
-  public synchronized void scheduleJob(final PhysicalPlan jobToSchedule,
-                                       final JobStateManager scheduledJobStateManager) {
+  public void scheduleJob(final PhysicalPlan jobToSchedule, final JobStateManager scheduledJobStateManager) {
     this.physicalPlan = jobToSchedule;
     this.jobStateManager = scheduledJobStateManager;
 
@@ -114,9 +119,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
   }
 
   @Override
-  public void updateJob(final String jobId,
-                        final PhysicalPlan newPhysicalPlan,
-                        final Pair<String, String> taskInfo) {
+  public void updateJob(final String jobId, final PhysicalPlan newPhysicalPlan, final Pair<String, String> taskInfo) {
     // update the job in the scheduler.
     // NOTE: what's already been executed is not modified in the new physical plan.
     this.physicalPlan = newPhysicalPlan;
@@ -134,10 +137,8 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * @param taskPutOnHold the ID of task that are put on hold. It is null otherwise.
    */
   @Override
-  public void onTaskGroupStateChanged(final String executorId,
-                                      final String taskGroupId,
-                                      final TaskGroupState.State newState,
-                                      final int attemptIdx,
+  public void onTaskGroupStateChanged(final String executorId, final String taskGroupId,
+                                      final TaskGroupState.State newState, final int attemptIdx,
                                       @Nullable final String taskPutOnHold,
                                       final TaskGroupState.RecoverableFailureCause failureCause) {
     switch (newState) {
@@ -163,143 +164,13 @@ public final class BatchSingleJobScheduler implements Scheduler {
     }
   }
 
-  /**
-   * Action after task group execution has been completed, not after it has been put on hold.
-   *
-   * @param executorId  the ID of the executor.
-   * @param taskGroupId the ID pf the task group completed.
-   */
-  private void onTaskGroupExecutionComplete(final String executorId,
-                                            final String taskGroupId) {
-    onTaskGroupExecutionComplete(executorId, taskGroupId, false);
-  }
-
-  /**
-   * Action after task group execution has been completed.
-   * @param executorId id of the executor.
-   * @param taskGroupId the ID of the task group completed.
-   * @param isOnHoldToComplete whether or not if it is switched to complete after it has been on hold.
-   */
-  private void onTaskGroupExecutionComplete(final String executorId,
-                                            final String taskGroupId,
-                                            final Boolean isOnHoldToComplete) {
-    LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId});
-    if (!isOnHoldToComplete) {
-      schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
-    }
-
-    final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
-    if (jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion)) {
-      // if the stage this task group belongs to is complete,
-      if (!jobStateManager.checkJobTermination()) { // and if the job is not yet complete or failed,
-        scheduleNextStage(stageIdForTaskGroupUponCompletion);
-      }
-    }
-  }
-
-  /**
-   * Action for after task group execution is put on hold.
-   * @param executorId     the ID of the executor.
-   * @param taskGroupId    the ID of the task group.
-   * @param taskPutOnHold  the ID of task that is put on hold.
-   */
-  private void onTaskGroupExecutionOnHold(final String executorId,
-                                          final String taskGroupId,
-                                          final String taskPutOnHold) {
-    LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId});
-    schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
-    final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
-
-    final boolean stageComplete =
-        jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion);
-
-    if (stageComplete) {
-      // get optimization vertex from the task.
-      final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-          getTaskGroupDagById(taskGroupId).getVertices().stream() // get tasks list
-              .filter(task -> task.getId().equals(taskPutOnHold)) // find it
-              .map(physicalPlan::getIRVertexOf) // get the corresponding IRVertex, the MetricCollectionBarrierVertex
-              .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
-              .distinct()
-              .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types
-              .findFirst().orElseThrow(() -> new RuntimeException(ON_HOLD.name() // get it
-              + " called with failed task ids by some other task than "
-              + MetricCollectionBarrierTask.class.getSimpleName()));
-      // and we will use this vertex to perform metric collection and dynamic optimization.
-
-      pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
-          new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, Pair.of(executorId, taskGroupId)));
-    } else {
-      onTaskGroupExecutionComplete(executorId, taskGroupId, true);
-    }
-  }
-
-  private void onTaskGroupExecutionFailedRecoverable(final String executorId, final String taskGroupId,
-                                                     final int attemptIdx, final TaskGroupState.State newState,
-                                                     final TaskGroupState.RecoverableFailureCause failureCause) {
-    LOG.info("{} failed in {} by {}", new Object[]{taskGroupId, executorId, failureCause});
-    schedulingPolicy.onTaskGroupExecutionFailed(executorId, taskGroupId);
-
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
-    final int attemptIndexForStage =
-        jobStateManager.getAttemptCountForStage(RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId));
-
-    switch (failureCause) {
-    // Previous task group must be re-executed, and incomplete task groups of the belonging stage must be rescheduled.
-    case INPUT_READ_FAILURE:
-      if (attemptIdx == attemptIndexForStage) {
-        jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
-        LOG.info("All task groups of {} will be made failed_recoverable.", stageId);
-        for (final PhysicalStage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
-          if (stage.getId().equals(stageId)) {
-            LOG.info("Removing TaskGroups for {} before they are scheduled to an executor", stage.getId());
-            pendingTaskGroupQueue.removeTaskGroupsAndDescendants(stage.getId());
-            stage.getTaskGroupIds().forEach(dstTaskGroupId -> {
-              if (jobStateManager.getTaskGroupState(dstTaskGroupId).getStateMachine().getCurrentState()
-                  != TaskGroupState.State.COMPLETE) {
-                jobStateManager.onTaskGroupStateChanged(dstTaskGroupId, TaskGroupState.State.FAILED_RECOVERABLE);
-                blockManagerMaster.onProducerTaskGroupFailed(dstTaskGroupId);
-              }
-            });
-            break;
-          }
-        }
-        // the stage this task group belongs to has become failed recoverable.
-        // it is a good point to start searching for another stage to schedule.
-        scheduleNextStage(stageId);
-      } else if (attemptIdx < attemptIndexForStage) {
-        // if attemptIdx < attemptIndexForStage, we can ignore this late arriving message.
-        LOG.info("{} state change to failed_recoverable arrived late, we will ignore this.", taskGroupId);
-      } else {
-        throw new SchedulingException(new Throwable("AttemptIdx for a task group cannot be greater than its stage"));
-      }
-      break;
-    // The task group executed successfully but there is something wrong with the output store.
-    case OUTPUT_WRITE_FAILURE:
-      jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
-      LOG.info("Only the failed task group will be retried.");
-
-      // the stage this task group belongs to has become failed recoverable.
-      // it is a good point to start searching for another stage to schedule.
-      blockManagerMaster.onProducerTaskGroupFailed(taskGroupId);
-      scheduleNextStage(stageId);
-      break;
-    case CONTAINER_FAILURE:
-      jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
-      LOG.info("Only the failed task group will be retried.");
-      break;
-    default:
-      throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
-    }
-  }
-
   @Override
-  public synchronized void onExecutorAdded(final String executorId) {
-    schedulingPolicy.onExecutorAdded(executorId);
+  public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
+    schedulingPolicy.onExecutorAdded(executorRepresenter);
   }
 
   @Override
-  public synchronized void onExecutorRemoved(final String executorId) {
+  public void onExecutorRemoved(final String executorId) {
     final Set<String> taskGroupsToReExecute = new HashSet<>();
 
     // TaskGroups for lost blocks
@@ -321,7 +192,16 @@ public final class BatchSingleJobScheduler implements Scheduler {
     }
   }
 
-  private synchronized void scheduleRootStages() {
+  @Override
+  public void terminate() {
+    this.schedulerRunner.terminate();
+    this.pendingTaskGroupQueue.close();
+  }
+
+  /**
+   * Schedule stages in initial schedule group, in reverse-topological order.
+   */
+  private void scheduleRootStages() {
     final List<PhysicalStage> rootStages =
         physicalPlan.getStageDAG().getTopologicalSort().stream().filter(physicalStage ->
             physicalStage.getScheduleGroupIndex() == initialScheduleGroup)
@@ -334,7 +214,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * Schedules the next stage to execute after a stage completion.
    * @param completedStageId the ID of the stage that just completed and triggered this scheduling.
    */
-  private synchronized void scheduleNextStage(final String completedStageId) {
+  private void scheduleNextStage(final String completedStageId) {
     final PhysicalStage completeOrFailedStage = getStageById(completedStageId);
     final Optional<List<PhysicalStage>> nextStagesToSchedule =
         selectNextStagesToSchedule(completeOrFailedStage.getScheduleGroupIndex());
@@ -367,7 +247,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * @return an optional of the (possibly empty) list of next schedulable stages, in the order they should be
    * enqueued to {@link PendingTaskGroupQueue}.
    */
-  private synchronized Optional<List<PhysicalStage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
+  private Optional<List<PhysicalStage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
     if (currentScheduleGroupIndex > initialScheduleGroup) {
       final Optional<List<PhysicalStage>> ancestorStagesFromAScheduleGroup =
           selectNextStagesToSchedule(currentScheduleGroupIndex - 1);
@@ -437,7 +317,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * It adds the list of task groups for the stage where the scheduler thread continuously polls from.
    * @param stageToSchedule the stage to schedule.
    */
-  private synchronized void scheduleStage(final PhysicalStage stageToSchedule) {
+  private void scheduleStage(final PhysicalStage stageToSchedule) {
     final List<PhysicalStageEdge> stageIncomingEdges =
         physicalPlan.getStageDAG().getIncomingEdgesOf(stageToSchedule.getId());
     final List<PhysicalStageEdge> stageOutgoingEdges =
@@ -526,8 +406,141 @@ public final class BatchSingleJobScheduler implements Scheduler {
     throw new RuntimeException(new Throwable("This taskGroupId does not exist in the plan"));
   }
 
-  @Override
-  public void terminate() {
-    // nothing to do yet.
+  /**
+   * Action after task group execution has been completed, not after it has been put on hold.
+   *
+   * @param executorId  the ID of the executor.
+   * @param taskGroupId the ID pf the task group completed.
+   */
+  private void onTaskGroupExecutionComplete(final String executorId,
+                                            final String taskGroupId) {
+    onTaskGroupExecutionComplete(executorId, taskGroupId, false);
+  }
+
+  /**
+   * Action after task group execution has been completed.
+   * @param executorId id of the executor.
+   * @param taskGroupId the ID of the task group completed.
+   * @param isOnHoldToComplete whether or not if it is switched to complete after it has been on hold.
+   */
+  private void onTaskGroupExecutionComplete(final String executorId,
+                                            final String taskGroupId,
+                                            final Boolean isOnHoldToComplete) {
+    LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId});
+    if (!isOnHoldToComplete) {
+      schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+    }
+
+    final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+    if (jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion)) {
+      // if the stage this task group belongs to is complete,
+      if (!jobStateManager.checkJobTermination()) { // and if the job is not yet complete or failed,
+        scheduleNextStage(stageIdForTaskGroupUponCompletion);
+      }
+    }
+  }
+
+  /**
+   * Action for after task group execution is put on hold.
+   * @param executorId     the ID of the executor.
+   * @param taskGroupId    the ID of the task group.
+   * @param taskPutOnHold  the ID of task that is put on hold.
+   */
+  private void onTaskGroupExecutionOnHold(final String executorId,
+                                          final String taskGroupId,
+                                          final String taskPutOnHold) {
+    LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId});
+    schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+    final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+
+    final boolean stageComplete =
+        jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion);
+
+    if (stageComplete) {
+      // get optimization vertex from the task.
+      final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
+          getTaskGroupDagById(taskGroupId).getVertices().stream() // get tasks list
+              .filter(task -> task.getId().equals(taskPutOnHold)) // find it
+              .map(physicalPlan::getIRVertexOf) // get the corresponding IRVertex, the MetricCollectionBarrierVertex
+              .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
+              .distinct()
+              .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types
+              .findFirst().orElseThrow(() -> new RuntimeException(ON_HOLD.name() // get it
+              + " called with failed task ids by some other task than "
+              + MetricCollectionBarrierTask.class.getSimpleName()));
+      // and we will use this vertex to perform metric collection and dynamic optimization.
+
+      pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
+          new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, Pair.of(executorId, taskGroupId)));
+    } else {
+      onTaskGroupExecutionComplete(executorId, taskGroupId, true);
+    }
+  }
+
+  /**
+   * Action for after task group execution has failed but it's recoverable.
+   * @param executorId    the ID of the executor
+   * @param taskGroupId   the ID of the task group
+   * @param attemptIdx    the attempt index
+   * @param newState      the state this situation
+   * @param failureCause  the cause of failure
+   */
+  private void onTaskGroupExecutionFailedRecoverable(final String executorId, final String taskGroupId,
+                                                     final int attemptIdx, final TaskGroupState.State newState,
+                                                     final TaskGroupState.RecoverableFailureCause failureCause) {
+    LOG.info("{} failed in {} by {}", new Object[]{taskGroupId, executorId, failureCause});
+    schedulingPolicy.onTaskGroupExecutionFailed(executorId, taskGroupId);
+
+    final String stageId = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+    final int attemptIndexForStage =
+        jobStateManager.getAttemptCountForStage(RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId));
+
+    switch (failureCause) {
+      // Previous task group must be re-executed, and incomplete task groups of the belonging stage must be rescheduled.
+      case INPUT_READ_FAILURE:
+        if (attemptIdx == attemptIndexForStage) {
+          jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
+          LOG.info("All task groups of {} will be made failed_recoverable.", stageId);
+          for (final PhysicalStage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
+            if (stage.getId().equals(stageId)) {
+              LOG.info("Removing TaskGroups for {} before they are scheduled to an executor", stage.getId());
+              pendingTaskGroupQueue.removeTaskGroupsAndDescendants(stage.getId());
+              stage.getTaskGroupIds().forEach(dstTaskGroupId -> {
+                if (jobStateManager.getTaskGroupState(dstTaskGroupId).getStateMachine().getCurrentState()
+                    != TaskGroupState.State.COMPLETE) {
+                  jobStateManager.onTaskGroupStateChanged(dstTaskGroupId, TaskGroupState.State.FAILED_RECOVERABLE);
+                  blockManagerMaster.onProducerTaskGroupFailed(dstTaskGroupId);
+                }
+              });
+              break;
+            }
+          }
+          // the stage this task group belongs to has become failed recoverable.
+          // it is a good point to start searching for another stage to schedule.
+          scheduleNextStage(stageId);
+        } else if (attemptIdx < attemptIndexForStage) {
+          // if attemptIdx < attemptIndexForStage, we can ignore this late arriving message.
+          LOG.info("{} state change to failed_recoverable arrived late, we will ignore this.", taskGroupId);
+        } else {
+          throw new SchedulingException(new Throwable("AttemptIdx for a task group cannot be greater than its stage"));
+        }
+        break;
+      // The task group executed successfully but there is something wrong with the output store.
+      case OUTPUT_WRITE_FAILURE:
+        jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
+        LOG.info("Only the failed task group will be retried.");
+
+        // the stage this task group belongs to has become failed recoverable.
+        // it is a good point to start searching for another stage to schedule.
+        blockManagerMaster.onProducerTaskGroupFailed(taskGroupId);
+        scheduleNextStage(stageId);
+        break;
+      case CONTAINER_FAILURE:
+        jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
+        LOG.info("Only the failed task group will be retried.");
+        break;
+      default:
+        throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
+    }
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
new file mode 100644
index 0000000..df89633
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ExecutorRegistry.java
@@ -0,0 +1,204 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+
+/**
+ * (WARNING) This class is not thread-safe.
+ * (i.e., Only a SchedulingPolicy accesses this class)
+ *
+ * Maintains map between executor id and {@link ExecutorRepresenter}.
+ */
+@DriverSide
+@NotThreadSafe
+public final class ExecutorRegistry {
+  private final Map<String, ExecutorRepresenter> runningExecutors;
+  private final Map<String, ExecutorRepresenter> failedExecutors;
+  private final Map<String, ExecutorRepresenter> completedExecutors;
+
+  @Inject
+  public ExecutorRegistry() {
+    this.runningExecutors = new HashMap<>();
+    this.failedExecutors = new HashMap<>();
+    this.completedExecutors = new HashMap<>();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuffer sb = new StringBuffer();
+    sb.append("Running: ");
+    sb.append(runningExecutors.toString());
+    sb.append("/ Failed: ");
+    sb.append(failedExecutors.toString());
+    sb.append("/ Completed: ");
+    sb.append(completedExecutors.toString());
+    return sb.toString();
+  }
+
+  /**
+   * @param executorId the executor id
+   * @return the corresponding {@link ExecutorRepresenter} that has not failed
+   * @throws NoSuchExecutorException when the executor was not found
+   */
+  @Nonnull
+  public ExecutorRepresenter getExecutorRepresenter(final String executorId) throws NoSuchExecutorException {
+    try {
+      return getRunningExecutorRepresenter(executorId);
+    } catch (final NoSuchExecutorException e) {
+      return getFailedExecutorRepresenter(executorId);
+    }
+  }
+
+  /**
+   * @param executorId the executor id
+   * @return the corresponding {@link ExecutorRepresenter} that has not failed
+   * @throws NoSuchExecutorException when the executor was not found
+   */
+  @Nonnull
+  public ExecutorRepresenter getRunningExecutorRepresenter(final String executorId) throws NoSuchExecutorException {
+    final ExecutorRepresenter representer = runningExecutors.get(executorId);
+    if (representer == null) {
+      throw new NoSuchExecutorException(executorId);
+    }
+    return representer;
+  }
+
+  /**
+   * @param executorId the executor id
+   * @return the corresponding {@link ExecutorRepresenter} that has not failed
+   * @throws NoSuchExecutorException when the executor was not found
+   */
+  @Nonnull
+  public ExecutorRepresenter getFailedExecutorRepresenter(final String executorId) throws NoSuchExecutorException {
+    final ExecutorRepresenter representer = failedExecutors.get(executorId);
+    if (representer == null) {
+      throw new NoSuchExecutorException(executorId);
+    }
+    return representer;
+  }
+
+  /**
+   * Returns a {@link Set} of running executor ids in the registry.
+   * Note the set is not modifiable. Also, further changes in the registry will not be reflected to the set.
+   * @return a {@link Set} of executor ids for running executors in the registry
+   */
+  public Set<String> getRunningExecutorIds() {
+    return Collections.unmodifiableSet(new TreeSet<>(runningExecutors.keySet()));
+  }
+
+  /**
+   * Adds executor representer.
+   * @param representer the {@link ExecutorRepresenter} to register.
+   * @throws DuplicateExecutorIdException on multiple attempts to register same representer,
+   *         or different representers with same executor id.
+   */
+  public void registerRepresenter(final ExecutorRepresenter representer) throws DuplicateExecutorIdException {
+    final String executorId = representer.getExecutorId();
+    if (failedExecutors.get(executorId) != null) {
+      throw new DuplicateExecutorIdException(executorId);
+    }
+    runningExecutors.compute(executorId, (id, existingRepresenter) -> {
+      if (existingRepresenter != null) {
+        throw new DuplicateExecutorIdException(id);
+      }
+      return representer;
+    });
+  }
+
+  /**
+   * Moves the representer into the pool of representer of the failed executors.
+   * @param executorId the corresponding executor id
+   * @throws NoSuchExecutorException when the specified executor id is not registered, or already set as failed
+   */
+  public void setRepresenterAsFailed(final String executorId) throws NoSuchExecutorException {
+    final ExecutorRepresenter representer = runningExecutors.remove(executorId);
+    if (representer == null) {
+      throw new NoSuchExecutorException(executorId);
+    }
+    failedExecutors.put(executorId, representer);
+  }
+
+  /**
+   * Moves the representer into the pool of representer of the failed executors.
+   * @param executorId the corresponding executor id
+   * @throws NoSuchExecutorException when the specified executor id is not registered, or already set as failed
+   */
+  public void setRepresenterAsCompleted(final String executorId) throws NoSuchExecutorException {
+    final ExecutorRepresenter representer = runningExecutors.remove(executorId);
+    if (representer == null) {
+      throw new NoSuchExecutorException(executorId);
+    }
+    if (failedExecutors.containsKey(executorId)) {
+      throw new IllegalStateException(executorId + " is in " + failedExecutors);
+    }
+    if (completedExecutors.containsKey(executorId)) {
+      throw new IllegalStateException(executorId + " is already in " + completedExecutors);
+    }
+
+    completedExecutors.put(executorId, representer);
+  }
+
+  /**
+   * Exception that indicates multiple attempts to register executors with same executor id.
+   */
+  public final class DuplicateExecutorIdException extends RuntimeException {
+    private final String executorId;
+
+    /**
+     * @param executorId the executor id that caused this exception
+     */
+    public DuplicateExecutorIdException(final String executorId) {
+      super(String.format("Duplicate executorId: %s", executorId));
+      this.executorId = executorId;
+    }
+
+    /**
+     * @return the executor id for this exception
+     */
+    public String getExecutorId() {
+      return executorId;
+    }
+  }
+
+  /**
+   * Exception that indicates no executor for the specified executorId.
+   */
+  public final class NoSuchExecutorException extends RuntimeException {
+    private final String executorId;
+
+    /**
+     * @param executorId the executor id that caused this exception
+     */
+    public NoSuchExecutorException(final String executorId) {
+      super(String.format("No such executor: %s", executorId));
+      this.executorId = executorId;
+    }
+
+    /**
+     * @return the executor id for this exception
+     */
+    public String getExecutorId() {
+      return executorId;
+    }
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index cde2670..0db67ad 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -15,13 +15,13 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.SchedulingException;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.Parameter;
@@ -49,10 +49,10 @@ import java.util.stream.Collectors;
 public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
   private static final Logger LOG = LoggerFactory.getLogger(RoundRobinSchedulingPolicy.class.getName());
 
-  private final ContainerManager containerManager;
-
   private final int scheduleTimeoutMs;
 
+  private final ExecutorRegistry executorRegistry;
+
   /**
    * Thread safety is provided by this lock as multiple threads can call the methods in this class concurrently.
    */
@@ -71,25 +71,19 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
   private final Map<String, List<String>> executorIdByContainerType;
 
   /**
-   * A copy of {@link ContainerManager#executorRepresenterMap}.
-   * This cached copy is updated when an executor is added or removed.
-   */
-  private final Map<String, ExecutorRepresenter> executorRepresenterMap;
-
-  /**
    * The index of the next executor to be assigned for each container type.
    * This map allows the executor index computation of the RR scheduling.
    */
   private final Map<String, Integer> nextExecutorIndexByContainerType;
 
   @Inject
-  public RoundRobinSchedulingPolicy(final ContainerManager containerManager,
+  @VisibleForTesting
+  public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry,
                                     @Parameter(JobConf.SchedulerTimeoutMs.class) final int scheduleTimeoutMs) {
-    this.containerManager = containerManager;
     this.scheduleTimeoutMs = scheduleTimeoutMs;
+    this.executorRegistry = executorRegistry;
     this.lock = new ReentrantLock();
     this.executorIdByContainerType = new HashMap<>();
-    this.executorRepresenterMap = new HashMap<>();
     this.conditionByContainerType = new HashMap<>();
     this.nextExecutorIndexByContainerType = new HashMap<>();
     initializeContainerTypeIfAbsent(ExecutorPlacementProperty.NONE); // Need this to avoid potential null errors
@@ -133,6 +127,91 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
     }
   }
 
+  @Override
+  public void onExecutorAdded(final ExecutorRepresenter executor) {
+    lock.lock();
+    try {
+      executorRegistry.registerRepresenter(executor);
+      final String containerType = executor.getContainerType();
+      initializeContainerTypeIfAbsent(containerType);
+
+      executorIdByContainerType.get(containerType)
+          .add(nextExecutorIndexByContainerType.get(containerType), executor.getExecutorId());
+      signalPossiblyWaitingScheduler(containerType);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public Set<String> onExecutorRemoved(final String executorId) {
+    lock.lock();
+    try {
+      executorRegistry.setRepresenterAsFailed(executorId);
+      final ExecutorRepresenter executor = executorRegistry.getFailedExecutorRepresenter(executorId);
+      executor.onExecutorFailed();
+
+      final String containerType = executor.getContainerType();
+
+      final List<String> executorIdList = executorIdByContainerType.get(containerType);
+      int nextExecutorIndex = nextExecutorIndexByContainerType.get(containerType);
+
+      final int executorAssignmentLocation = executorIdList.indexOf(executorId);
+      if (executorAssignmentLocation < nextExecutorIndex) {
+        nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex - 1);
+      } else if (executorAssignmentLocation == nextExecutorIndex) {
+        nextExecutorIndexByContainerType.put(containerType, 0);
+      }
+      executorIdList.remove(executorId);
+
+      return Collections.unmodifiableSet(executor.getFailedTaskGroups());
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void onTaskGroupExecutionComplete(final String executorId, final String taskGroupId) {
+    lock.lock();
+    try {
+      final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId);
+      executor.onTaskGroupExecutionComplete(taskGroupId);
+      LOG.info("{" + taskGroupId + "} completed in [" + executorId + "]");
+
+      // the scheduler thread may be waiting for a free slot...
+      final String containerType = executor.getContainerType();
+      signalPossiblyWaitingScheduler(containerType);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void onTaskGroupExecutionFailed(final String executorId, final String taskGroupId) {
+    lock.lock();
+    try {
+      final ExecutorRepresenter executor = executorRegistry.getExecutorRepresenter(executorId);
+
+      executor.onTaskGroupExecutionFailed(taskGroupId);
+      LOG.info("{" + taskGroupId + "} failed in [" + executorId + "]");
+
+      // the scheduler thread may be waiting for a free slot...
+      final String containerType = executor.getContainerType();
+      signalPossiblyWaitingScheduler(containerType);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void terminate() {
+    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
+      final ExecutorRepresenter representer = executorRegistry.getRunningExecutorRepresenter(executorId);
+      representer.shutDown();
+      executorRegistry.setRepresenterAsCompleted(executorId);
+    }
+  }
+
   /**
    * Sticks to the RR policy to select an executor for the next task group.
    * It checks the task groups running (as compared to each executor's capacity).
@@ -152,7 +231,7 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
         final int index = (nextExecutorIndex + i) % numExecutors;
         final String selectedExecutorId = candidateExecutorIds.get(index);
 
-        final ExecutorRepresenter executor = executorRepresenterMap.get(selectedExecutorId);
+        final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(selectedExecutorId);
         if (hasFreeSlot(executor)) {
           nextExecutorIndex = (index + 1) % numExecutors;
           nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex);
@@ -176,7 +255,7 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
                                  final JobStateManager jobStateManager) {
     jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), TaskGroupState.State.EXECUTING);
 
-    final ExecutorRepresenter executor = executorRepresenterMap.get(executorId);
+    final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId);
     LOG.info("Scheduling {} to {}",
         new Object[]{scheduledTaskGroup.getTaskGroupId(), executorId});
     executor.onTaskGroupScheduled(scheduledTaskGroup);
@@ -206,90 +285,4 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
       conditionByContainerType.get(ExecutorPlacementProperty.NONE).signal();
     }
   }
-
-  @Override
-  public void onExecutorAdded(final String executorId) {
-    lock.lock();
-    try {
-      updateCachedExecutorRepresenterMap();
-      final ExecutorRepresenter executor = executorRepresenterMap.get(executorId);
-      final String containerType = executor.getContainerType();
-      initializeContainerTypeIfAbsent(containerType);
-
-      executorIdByContainerType.get(containerType)
-          .add(nextExecutorIndexByContainerType.get(containerType), executorId);
-      signalPossiblyWaitingScheduler(containerType);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public Set<String> onExecutorRemoved(final String executorId) {
-    lock.lock();
-    try {
-      final ExecutorRepresenter executor = containerManager.getFailedExecutorRepresenterMap().get(executorId);
-      final String containerType = executor.getContainerType();
-
-      final List<String> executorIdList = executorIdByContainerType.get(containerType);
-      int nextExecutorIndex = nextExecutorIndexByContainerType.get(containerType);
-
-      final int executorAssignmentLocation = executorIdList.indexOf(executorId);
-      if (executorAssignmentLocation < nextExecutorIndex) {
-        nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex - 1);
-      } else if (executorAssignmentLocation == nextExecutorIndex) {
-        nextExecutorIndexByContainerType.put(containerType, 0);
-      }
-      executorIdList.remove(executorId);
-
-      updateCachedExecutorRepresenterMap();
-
-      return Collections.unmodifiableSet(executor.getRunningTaskGroups());
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  private void updateCachedExecutorRepresenterMap() {
-    executorRepresenterMap.clear();
-    executorRepresenterMap.putAll(containerManager.getExecutorRepresenterMap());
-  }
-
-  @Override
-  public void onTaskGroupExecutionComplete(final String executorId, final String taskGroupId) {
-    lock.lock();
-    try {
-      final ExecutorRepresenter executor = executorRepresenterMap.get(executorId);
-      executor.onTaskGroupExecutionComplete(taskGroupId);
-      LOG.info("{" + taskGroupId + "} completed in [" + executorId + "]");
-
-      // the scheduler thread may be waiting for a free slot...
-      final String containerType = executor.getContainerType();
-      signalPossiblyWaitingScheduler(containerType);
-    } finally {
-      lock.unlock();
-    }
-  }
-
-  @Override
-  public void onTaskGroupExecutionFailed(final String executorId, final String taskGroupId) {
-    lock.lock();
-    try {
-      ExecutorRepresenter executor = executorRepresenterMap.get(executorId);
-
-      // When this method is called due to container failure and the executor has been moved to the failed map.
-      if (executor == null) {
-        executor = containerManager.getFailedExecutorRepresenterMap().get(executorId);
-      }
-
-      executor.onTaskGroupExecutionFailed(taskGroupId);
-      LOG.info("{" + taskGroupId + "} failed in [" + executorId + "]");
-
-      // the scheduler thread may be waiting for a free slot...
-      final String containerType = executor.getContainerType();
-      signalPossiblyWaitingScheduler(containerType);
-    } finally {
-      lock.unlock();
-    }
-  }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 5c6c862..65f44dc 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -19,12 +19,17 @@ import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
 import javax.annotation.Nullable;
 
 /**
+ * Only two threads call scheduling code: RuntimeMaster thread (RMT), and SchedulerThread(ST).
+ * RMT and ST meet only at two points: SchedulingPolicy, and PendingTaskGroupQueue, which are synchronized(ThreadSafe).
+ * Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
+ *
  * Receives jobs to execute and schedules
  * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup} to executors.
  */
@@ -51,9 +56,9 @@ public interface Scheduler {
 
   /**
    * Called when an executor is added to Runtime, so that the extra resource can be used to execute the job.
-   * @param executorId of the executor that has been added.
+   * @param executorRepresenter a representation of the added executor.
    */
-  void onExecutorAdded(String executorId);
+  void onExecutorAdded(ExecutorRepresenter executorRepresenter);
 
   /**
    * Called when an executor is removed from Runtime, so that faults related to the removal can be handled.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 577f44c..c6079d6 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -27,12 +27,14 @@ import java.util.concurrent.Executors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 
 /**
  * Takes a TaskGroup from the pending queue and schedules it to an executor.
  */
 @DriverSide
+@NotThreadSafe
 public final class SchedulerRunner {
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerRunner.class.getName());
   private final Map<String, JobStateManager> jobStateManagers;
@@ -48,12 +50,16 @@ public final class SchedulerRunner {
     this.jobStateManagers = new HashMap<>();
     this.pendingTaskGroupQueue = pendingTaskGroupQueue;
     this.schedulingPolicy = schedulingPolicy;
-    this.schedulerThread = Executors.newSingleThreadExecutor();
+    this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "SchedulerRunner"));
     this.initialJobScheduled = false;
     this.isTerminated = false;
   }
 
-  public synchronized void scheduleJob(final JobStateManager jobStateManager) {
+  /**
+   * Begin scheduling a job.
+   * @param jobStateManager the corresponding {@link JobStateManager}
+   */
+  void scheduleJob(final JobStateManager jobStateManager) {
     if (!isTerminated) {
       jobStateManagers.put(jobStateManager.getJobId(), jobStateManager);
 
@@ -65,7 +71,8 @@ public final class SchedulerRunner {
     } // else ignore new incoming jobs when terminated.
   }
 
-  public synchronized void terminate() {
+  void terminate() {
+    schedulingPolicy.terminate();
     isTerminated = true;
   }
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 8a1145a..59f9adf 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -17,16 +17,19 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
+import javax.annotation.concurrent.ThreadSafe;
 import java.util.Set;
 
 /**
- * Defines the policy by which {@link BatchSingleJobScheduler} assigns task groups to executors.
+ * (WARNING) Implementations of this interface must be thread-safe.
  */
 @DriverSide
-@DefaultImplementation(RoundRobinSchedulingPolicy.class)
+@ThreadSafe
+@DefaultImplementation(SourceLocationAwareSchedulingPolicy.class)
 public interface SchedulingPolicy {
 
   /**
@@ -51,9 +54,9 @@ public interface SchedulingPolicy {
    * Unlocks this policy to schedule a next taskGroup if locked.
    * (Depending on the executor's resource type)
    *
-   * @param executorId for the executor that has been added.
+   * @param executorRepresenter for the executor that has been added.
    */
-  void onExecutorAdded(String executorId);
+  void onExecutorAdded(ExecutorRepresenter executorRepresenter);
 
   /**
    * Deletes the executorId from the pool of available executors.
@@ -84,4 +87,9 @@ public interface SchedulingPolicy {
    * @param taskGroupId whose execution has completed.
    */
   void onTaskGroupExecutionFailed(String executorId, String taskGroupId);
+
+  /**
+   * End of scheduling.
+   */
+  void terminate();
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
new file mode 100644
index 0000000..e02c259
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -0,0 +1,242 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.exception.SchedulingException;
+import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.annotations.audience.DriverSide;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.ThreadSafe;
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for TaskGroups
+ * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors
+ * where the corresponding data resides.
+ */
+@ThreadSafe
+@DriverSide
+public final class SourceLocationAwareSchedulingPolicy implements SchedulingPolicy {
+  private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
+
+  private final ExecutorRegistry executorRegistry;
+  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  private final long scheduleTimeoutMs;
+  private final Lock lock = new ReentrantLock();
+  private final Condition moreExecutorsAvailableCondition = lock.newCondition();
+
+  /**
+   * Injectable constructor for {@link SourceLocationAwareSchedulingPolicy}.
+   * @param executorRegistry provides catalog of available executors
+   * @param roundRobinSchedulingPolicy provides fallback for TaskGroups with no input source information
+   */
+  @Inject
+  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry executorRegistry,
+                                              final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy) {
+    this.executorRegistry = executorRegistry;
+    this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+    this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
+  }
+
+  @Override
+  public long getScheduleTimeoutMs() {
+    return scheduleTimeoutMs;
+  }
+
+  /**
+   * Try to schedule a TaskGroup.
+   * If the TaskGroup has one or more source tasks, this method schedules the task group to one of the physical nodes,
+   * chosen from union of set of locations where splits of each source task resides.
+   * If the TaskGroup has no source tasks, falls back to {@link RoundRobinSchedulingPolicy}.
+   * @param scheduledTaskGroup to schedule.
+   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
+   * @return true if the task group is successfully scheduled, false otherwise.
+   */
+  @Override
+  public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
+                                   final JobStateManager jobStateManager) {
+    lock.lock();
+    try {
+      Set<String> sourceLocations = Collections.emptySet();
+      try {
+        sourceLocations = getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+      } catch (final UnsupportedOperationException e) {
+        // do nothing
+      } catch (final Exception e) {
+        LOG.warn(String.format("Exception while trying to get source location for %s",
+            scheduledTaskGroup.getTaskGroupId()), e);
+      }
+      if (sourceLocations.size() == 0) {
+        // No source location information found, fall back to the RoundRobinSchedulingPolicy
+        return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, jobStateManager);
+      }
+
+      long timeoutInNanoseconds = scheduleTimeoutMs * 1000000;
+      while (timeoutInNanoseconds > 0) {
+        if (scheduleToLocalNode(scheduledTaskGroup, jobStateManager, sourceLocations)) {
+          return true;
+        }
+        try {
+          timeoutInNanoseconds = moreExecutorsAvailableCondition.awaitNanos(timeoutInNanoseconds);
+          // Signals on this condition does not necessarily guarantee that the added executor helps scheduling the
+          // TaskGroup we are interested in. We need to await again if the consequent scheduling attempt still fails,
+          // until we spend the time budget specified.
+        } catch (final InterruptedException e) {
+          throw new SchedulingException(e);
+        }
+      }
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Try to schedule a TaskGroup with source task.
+   * @param scheduledTaskGroup TaskGroup to schedule
+   * @param jobStateManager {@link JobStateManager}
+   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
+   * @return true if the task group is successfully scheduled, false otherwise.
+   */
+  private boolean scheduleToLocalNode(final ScheduledTaskGroup scheduledTaskGroup,
+                                      final JobStateManager jobStateManager,
+                                      final Set<String> sourceLocations) {
+    lock.lock();
+    try {
+      final List<ExecutorRepresenter> candidateExecutors =
+          selectExecutorByContainerTypeAndNodeNames(scheduledTaskGroup.getContainerType(), sourceLocations);
+      if (candidateExecutors.size() == 0) {
+        return false;
+      }
+      final int randomIndex = ThreadLocalRandom.current().nextInt(0, candidateExecutors.size());
+      final ExecutorRepresenter selectedExecutor = candidateExecutors.get(randomIndex);
+
+      jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), TaskGroupState.State.EXECUTING);
+      selectedExecutor.onTaskGroupScheduled(scheduledTaskGroup);
+      LOG.info("Scheduling {} (source location: {}) to {} (node name: {})", scheduledTaskGroup.getTaskGroupId(),
+          String.join(", ", sourceLocations), selectedExecutor.getExecutorId(),
+          selectedExecutor.getNodeName());
+      return true;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
+    lock.lock();
+    try {
+      moreExecutorsAvailableCondition.signal();
+      roundRobinSchedulingPolicy.onExecutorAdded(executorRepresenter);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public Set<String> onExecutorRemoved(final String executorId) {
+    lock.lock();
+    try {
+      return roundRobinSchedulingPolicy.onExecutorRemoved(executorId);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void onTaskGroupExecutionComplete(final String executorId, final String taskGroupId) {
+    lock.lock();
+    try {
+      moreExecutorsAvailableCondition.signal();
+      roundRobinSchedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void onTaskGroupExecutionFailed(final String executorId, final String taskGroupId) {
+    lock.lock();
+    try {
+      moreExecutorsAvailableCondition.signal();
+      roundRobinSchedulingPolicy.onTaskGroupExecutionFailed(executorId, taskGroupId);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public void terminate() {
+    lock.lock();
+    try {
+      roundRobinSchedulingPolicy.terminate();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * @param containerType type of the desired container type
+   * @param nodeNames set of node names
+   * @return list of executors, which resides in one of {@code nodeNames}, has container type of {@code containerType},
+   *         and has an empty slot for execution
+   */
+  private List<ExecutorRepresenter> selectExecutorByContainerTypeAndNodeNames(
+      final String containerType, final Set<String> nodeNames) {
+    lock.lock();
+    try {
+      final Stream<ExecutorRepresenter> localNodesWithSpareCapacity = executorRegistry.getRunningExecutorIds().stream()
+          .map(executorId -> executorRegistry.getRunningExecutorRepresenter(executorId))
+          .filter(executor -> executor.getRunningTaskGroups().size() < executor.getExecutorCapacity())
+          .filter(executor -> nodeNames.contains(executor.getNodeName()));
+      if (containerType.equals(ExecutorPlacementProperty.NONE)) {
+        return localNodesWithSpareCapacity.collect(Collectors.toList());
+      } else {
+        return localNodesWithSpareCapacity.filter(executor -> executor.getContainerType().equals(containerType))
+            .collect(Collectors.toList());
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * @param readables collection of readables
+   * @return Set of source locations from source tasks in {@code taskGroupDAG}
+   * @throws Exception for any exception raised during querying source locations for a readable
+   */
+  private static Set<String> getSourceLocations(final Collection<Readable> readables) throws Exception {
+    final List<String> sourceLocations = new ArrayList<>();
+    for (final Readable readable : readables) {
+      sourceLocations.addAll(readable.getLocations());
+    }
+    return new HashSet<>(sourceLocations);
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
index 40e5018..b4f411a 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
@@ -15,16 +15,11 @@
  */
 package edu.snu.nemo.tests.runtime;
 
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
@@ -32,10 +27,6 @@ import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
 
 import java.util.*;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -43,83 +34,62 @@ import java.util.stream.IntStream;
  * Utility class for runtime unit tests.
  */
 public final class RuntimeTestUtil {
-  private static ExecutorService completionEventThreadPool;
-  private static BlockingDeque<Runnable> eventRunnableQueue;
-  private static boolean testComplete;
-
-  public static void initialize() {
-    testComplete = false;
-    completionEventThreadPool = Executors.newFixedThreadPool(5);
-
-    eventRunnableQueue = new LinkedBlockingDeque<>();
-
-    for (int i = 0; i < 5; i++) {
-      completionEventThreadPool.execute(() -> {
-        while (!testComplete || !eventRunnableQueue.isEmpty()) {
-          try {
-            final Runnable event = eventRunnableQueue.takeFirst();
-            event.run();
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-        }
-      });
-    }
-    completionEventThreadPool.shutdown();
-  }
-
-  public static void cleanup() {
-    testComplete = true;
-  }
-
   /**
-   * Sends a stage's completion event to scheduler, with all its task groups marked as complete as well.
-   * This replaces executor's task group completion messages for testing purposes.
+   * Complete the stage by completing all of its TaskGroups.
    * @param jobStateManager for the submitted job.
    * @param scheduler for the submitted job.
-   * @param containerManager used for testing purposes.
+   * @param executorRegistry provides executor representers
    * @param physicalStage for which the states should be marked as complete.
    */
-  public static void sendStageCompletionEventToScheduler(final JobStateManager jobStateManager,
-                                                         final Scheduler scheduler,
-                                                         final ContainerManager containerManager,
-                                                         final PhysicalStage physicalStage,
-                                                         final int attemptIdx) {
-    eventRunnableQueue.add(new Runnable() {
-      @Override
-      public void run() {
-        while (jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
-            == StageState.State.EXECUTING) {
-          physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-            if (jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState()
-                == TaskGroupState.State.EXECUTING) {
-              sendTaskGroupStateEventToScheduler(scheduler, containerManager, taskGroupId,
-                  TaskGroupState.State.COMPLETE, attemptIdx, null);
-            }
-          });
-        }
+  public static void completeStage(final JobStateManager jobStateManager,
+                                   final Scheduler scheduler,
+                                   final ExecutorRegistry executorRegistry,
+                                   final PhysicalStage physicalStage,
+                                   final int attemptIdx) {
+    // Loop until the stage completes.
+    while (true) {
+      final Enum stageState = jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState();
+      if (StageState.State.COMPLETE == stageState) {
+        // Stage has completed, so we break out of the loop.
+        break;
+      } else if (StageState.State.EXECUTING == stageState) {
+        physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
+          final Enum tgState = jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState();
+          if (TaskGroupState.State.EXECUTING == tgState) {
+            sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId,
+                TaskGroupState.State.COMPLETE, attemptIdx, null);
+          } else if (TaskGroupState.State.READY == tgState || TaskGroupState.State.COMPLETE == tgState) {
+            // Skip READY (try in the next loop and see if it becomes EXECUTING) and COMPLETE.
+          } else {
+            throw new IllegalStateException(tgState.toString());
+          }
+        });
+      } else if (StageState.State.READY == stageState) {
+        // Skip and retry in the next loop.
+      } else {
+        throw new IllegalStateException(stageState.toString());
       }
-    });
+    }
   }
 
   /**
    * Sends task group state change event to scheduler.
    * This replaces executor's task group completion messages for testing purposes.
    * @param scheduler for the submitted job.
-   * @param containerManager used for testing purposes.
+   * @param executorRegistry provides executor representers
    * @param taskGroupId for the task group to change the state.
    * @param newState for the task group.
    * @param cause in the case of a recoverable failure.
    */
   public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler,
-                                                        final ContainerManager containerManager,
+                                                        final ExecutorRegistry executorRegistry,
                                                         final String taskGroupId,
                                                         final TaskGroupState.State newState,
                                                         final int attemptIdx,
                                                         final TaskGroupState.RecoverableFailureCause cause) {
     ExecutorRepresenter scheduledExecutor;
     do {
-      scheduledExecutor = findExecutorForTaskGroup(containerManager, taskGroupId);
+      scheduledExecutor = findExecutorForTaskGroup(executorRegistry, taskGroupId);
     } while (scheduledExecutor == null);
 
     scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), taskGroupId,
@@ -127,11 +97,11 @@ public final class RuntimeTestUtil {
   }
 
   public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler,
-                                                        final ContainerManager containerManager,
+                                                        final ExecutorRegistry executorRegistry,
                                                         final String taskGroupId,
                                                         final TaskGroupState.State newState,
                                                         final int attemptIdx) {
-    sendTaskGroupStateEventToScheduler(scheduler, containerManager, taskGroupId, newState, attemptIdx, null);
+    sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, newState, attemptIdx, null);
   }
 
   public static void mockSchedulerRunner(final PendingTaskGroupQueue pendingTaskGroupQueue,
@@ -151,88 +121,15 @@ public final class RuntimeTestUtil {
   }
 
   /**
-   * Sends partition state changes of a stage to PartitionManager.
-   * This replaces executor's partition state messages for testing purposes.
-   * @param blockManagerMaster used for testing purposes.
-   * @param containerManager used for testing purposes.
-   * @param stageOutgoingEdges to infer partition IDs.
-   * @param physicalStage to change the state.
-   * @param newState for the task group.
-   */
-  public static void sendPartitionStateEventForAStage(final BlockManagerMaster blockManagerMaster,
-                                                      final ContainerManager containerManager,
-                                                      final List<PhysicalStageEdge> stageOutgoingEdges,
-                                                      final PhysicalStage physicalStage,
-                                                      final BlockState.State newState) {
-    eventRunnableQueue.add(new Runnable() {
-      @Override
-      public void run() {
-                // Initialize states for blocks of inter-stage edges
-        stageOutgoingEdges.forEach(physicalStageEdge -> {
-          final int srcParallelism = physicalStage.getTaskGroupIds().size();
-          IntStream.range(0, srcParallelism).forEach(srcTaskIdx -> {
-            final String partitionId =
-                RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(), srcTaskIdx);
-              sendPartitionStateEventToPartitionManager(blockManagerMaster, containerManager, partitionId, newState);
-          });
-        });
-
-        // Initialize states for blocks of stage internal edges
-        physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-          final DAG<Task, RuntimeEdge<Task>> taskGroupInternalDag = physicalStage.getTaskGroupDag();
-          taskGroupInternalDag.getVertices().forEach(task -> {
-            final List<RuntimeEdge<Task>> internalOutgoingEdges = taskGroupInternalDag.getOutgoingEdgesOf(task);
-            internalOutgoingEdges.forEach(taskRuntimeEdge -> {
-              final String partitionId =
-                  RuntimeIdGenerator.generateBlockId(taskRuntimeEdge.getId(),
-                      RuntimeIdGenerator.getIndexFromTaskGroupId(taskGroupId));
-              sendPartitionStateEventToPartitionManager(blockManagerMaster, containerManager, partitionId, newState);
-            });
-          });
-        });
-      }
-    });
-  }
-
-  /**
-   * Sends partition state change event to PartitionManager.
-   * This replaces executor's partition state messages for testing purposes.
-   * @param blockManagerMaster used for testing purposes.
-   * @param containerManager used for testing purposes.
-   * @param partitionId for the partition to change the state.
-   * @param newState for the task group.
-   */
-  public static void sendPartitionStateEventToPartitionManager(final BlockManagerMaster blockManagerMaster,
-                                                               final ContainerManager containerManager,
-                                                               final String partitionId,
-                                                               final BlockState.State newState) {
-    eventRunnableQueue.add(new Runnable() {
-      @Override
-      public void run() {
-        final Set<String> parentTaskGroupIds = blockManagerMaster.getProducerTaskGroupIds(partitionId);
-        if (!parentTaskGroupIds.isEmpty()) {
-          parentTaskGroupIds.forEach(taskGroupId -> {
-            final ExecutorRepresenter scheduledExecutor = findExecutorForTaskGroup(containerManager, taskGroupId);
-
-            if (scheduledExecutor != null) {
-              blockManagerMaster.onBlockStateChanged(
-                  partitionId, newState, scheduledExecutor.getExecutorId());
-            }
-          });
-        }
-      }
-    });
-  }
-
-  /**
    * Retrieves the executor to which the given task group was scheduled.
    * @param taskGroupId of the task group to search.
-   * @param containerManager used for testing purposes.
+   * @param executorRegistry provides executor representers
    * @return the {@link ExecutorRepresenter} of the executor the task group was scheduled to.
    */
-  private static ExecutorRepresenter findExecutorForTaskGroup(final ContainerManager containerManager,
+  private static ExecutorRepresenter findExecutorForTaskGroup(final ExecutorRegistry executorRegistry,
                                                               final String taskGroupId) {
-    for (final ExecutorRepresenter executor : containerManager.getExecutorRepresenterMap().values()) {
+    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
+      final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId);
       if (executor.getRunningTaskGroups().contains(taskGroupId)
           || executor.getCompleteTaskGroups().contains(taskGroupId)) {
         return executor;
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
index 0209670..ee25ec3 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
@@ -128,6 +128,10 @@ public final class TaskGroupExecutorTest {
       public Iterable read() throws Exception {
         return elements;
       }
+      @Override
+      public List<String> getLocations() {
+        throw new UnsupportedOperationException();
+      }
     };
     final Map<String, Readable> logicalIdToReadable = new HashMap<>();
     logicalIdToReadable.put(sourceTaskId, readable);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index b536372..70379ee 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -52,10 +52,12 @@ import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.runtime.master.RuntimeMaster;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.scheduler.*;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.commons.io.FileUtils;
+import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.io.network.naming.NameResolverConfiguration;
 import org.apache.reef.io.network.naming.NameServer;
 import org.apache.reef.io.network.util.StringIdentifierFactory;
@@ -122,11 +124,19 @@ public final class DataTransferTest {
     final LocalMessageDispatcher messageDispatcher = new LocalMessageDispatcher();
     final LocalMessageEnvironment messageEnvironment =
         new LocalMessageEnvironment(MessageEnvironment.MASTER_COMMUNICATION_ID, messageDispatcher);
-    final ContainerManager containerManager = new ContainerManager(1, null, messageEnvironment);
+    final Configuration configuration = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(JobConf.ScheduleSerThread.class, "1")
+        .build();
+    final Injector injector = Tang.Factory.getTang().newInjector(configuration);
+    injector.bindVolatileInstance(EvaluatorRequestor.class, mock(EvaluatorRequestor.class));
+    injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
+    final ContainerManager containerManager = injector.getInstance(ContainerManager.class);
+
     final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class);
     final PubSubEventHandlerWrapper pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
-    final SchedulingPolicy schedulingPolicy = new RoundRobinSchedulingPolicy(containerManager, SCHEDULE_TIMEOUT);
+    final SchedulingPolicy schedulingPolicy = new RoundRobinSchedulingPolicy(
+        injector.getInstance(ExecutorRegistry.class), SCHEDULE_TIMEOUT);
     final PendingTaskGroupQueue taskGroupQueue = new SingleJobTaskGroupQueue();
     final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskGroupQueue);
     final Scheduler scheduler =
@@ -136,8 +146,8 @@ public final class DataTransferTest {
 
     // Necessary for wiring up the message environments
     final RuntimeMaster runtimeMaster =
-        new RuntimeMaster(scheduler, schedulerRunner, taskGroupQueue,
-            containerManager, master, metricMessageHandler, messageEnvironment, EMPTY_DAG_DIRECTORY);
+        new RuntimeMaster(scheduler, containerManager, master,
+            metricMessageHandler, messageEnvironment, EMPTY_DAG_DIRECTORY);
 
     final Injector injector1 = Tang.Factory.getTang().newInjector();
     injector1.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java
index c0a6c3a..b23c233 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/ContainerManagerTest.java
@@ -16,12 +16,19 @@
 package edu.snu.nemo.tests.runtime.master;
 
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
+import org.apache.reef.driver.catalog.NodeDescriptor;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.driver.evaluator.AllocatedEvaluator;
+import org.apache.reef.driver.evaluator.EvaluatorDescriptor;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -45,11 +52,17 @@ public final class ContainerManagerTest {
   private final int DEFAULT_MEMORY = 10240;
 
   @Before
-  public void setUp() {
+  public void setUp() throws InjectionException {
 
     final MessageEnvironment mockMsgEnv = mock(MessageEnvironment.class);
     when(mockMsgEnv.asyncConnect(anyString(), anyString())).thenReturn(mock(Future.class));
-    containerManager = new ContainerManager(1, mock(EvaluatorRequestor.class), mockMsgEnv);
+    final Configuration configuration = Tang.Factory.getTang().newConfigurationBuilder()
+        .bindNamedParameter(JobConf.ScheduleSerThread.class, "1")
+        .build();
+    final Injector injector = Tang.Factory.getTang().newInjector(configuration);
+    injector.bindVolatileInstance(EvaluatorRequestor.class, mock(EvaluatorRequestor.class));
+    injector.bindVolatileInstance(MessageEnvironment.class, mockMsgEnv);
+    containerManager = injector.getInstance(ContainerManager.class);
   }
 
   @Test(timeout=5000)
@@ -72,21 +85,6 @@ public final class ContainerManagerTest {
     allocateResource(createMockContext());
     allocateResource(createMockContext());
     allocateResource(createMockContext());
-
-    // Say the job finishes,
-    // and we would like to shutdown the running executors and terminate ContainerManager.
-    final Future<Boolean> terminationResult = containerManager.terminate();
-
-    // But say, the 5th container and executor was only allocated by this point.
-    allocateResource(createMockContext());
-
-    try {
-      assertTrue(terminationResult.get());
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    } catch (ExecutionException e) {
-      e.printStackTrace();
-    }
   }
 
   private AllocatedEvaluator createMockEvaluator() {
@@ -94,14 +92,20 @@ public final class ContainerManagerTest {
   }
 
   private ActiveContext createMockContext() {
+    final String name = "TestContext" + testIdNumber++;
+    final NodeDescriptor mockedNodeDescriptor = mock(NodeDescriptor.class);
+    when(mockedNodeDescriptor.getName()).thenReturn(name);
+    final EvaluatorDescriptor mockedEvaluatorDescriptor = mock(EvaluatorDescriptor.class);
+    when(mockedEvaluatorDescriptor.getNodeDescriptor()).thenReturn(mockedNodeDescriptor);
     final ActiveContext mockedContext = mock(ActiveContext.class);
-    when(mockedContext.getId()).thenReturn("TestContext" + testIdNumber++);
+    when(mockedContext.getId()).thenReturn(name);
+    when(mockedContext.getEvaluatorDescriptor()).thenReturn(mockedEvaluatorDescriptor);
 
     return mockedContext;
   }
 
   private void allocateResource(final ActiveContext mockContext) {
     containerManager.onContainerAllocated(mockContext.getId(), createMockEvaluator(), null);
-    containerManager.onExecutorLaunched(mockContext);
+    containerManager.onContainerLaunched(mockContext);
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index d6e2a6c..9afe504 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -29,6 +29,7 @@ import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.tests.runtime.RuntimeTestUtil;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
@@ -61,10 +62,10 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Function;
 
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests {@link BatchSingleJobScheduler}.
@@ -78,7 +79,7 @@ public final class BatchSingleJobSchedulerTest {
   private Scheduler scheduler;
   private SchedulingPolicy schedulingPolicy;
   private SchedulerRunner schedulerRunner;
-  private ContainerManager containerManager;
+  private ExecutorRegistry executorRegistry;
   private MetricMessageHandler metricMessageHandler;
   private PendingTaskGroupQueue pendingTaskGroupQueue;
   private PubSubEventHandlerWrapper pubSubEventHandler;
@@ -89,17 +90,21 @@ public final class BatchSingleJobSchedulerTest {
 
   private static final int TEST_TIMEOUT_MS = 500;
 
+  private static final int EXECUTOR_CAPACITY = 20;
+
   // This schedule index will make sure that task group events are not ignored
   private static final int MAGIC_SCHEDULE_ATTEMPT_INDEX = Integer.MAX_VALUE;
 
   @Before
   public void setUp() throws Exception {
-    RuntimeTestUtil.initialize();
-    irDAGBuilder = new DAGBuilder<>();
-    containerManager = mock(ContainerManager.class);
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
+
+    irDAGBuilder = initializeDAGBuilder();
+    executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
     pendingTaskGroupQueue = new SingleJobTaskGroupQueue();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(containerManager, TEST_TIMEOUT_MS);
+    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry, TEST_TIMEOUT_MS);
     schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupQueue);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
@@ -107,44 +112,36 @@ public final class BatchSingleJobSchedulerTest {
         new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskGroupQueue,
             blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler);
 
-    final Map<String, ExecutorRepresenter> executorRepresenterMap = new HashMap<>();
-    when(containerManager.getExecutorRepresenterMap()).thenReturn(executorRepresenterMap);
-
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
 
     final ExecutorService serializationExecutorService = Executors.newSingleThreadExecutor();
-    final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 1, 0);
-    final ExecutorRepresenter a3 =
-        new ExecutorRepresenter("a3", computeSpec, mockMsgSender, activeContext, serializationExecutorService);
-    final ExecutorRepresenter a2 =
-        new ExecutorRepresenter("a2", computeSpec, mockMsgSender, activeContext, serializationExecutorService);
-    final ExecutorRepresenter a1 =
-        new ExecutorRepresenter("a1", computeSpec, mockMsgSender, activeContext, serializationExecutorService);
-
-    final ResourceSpecification storageSpec = new ResourceSpecification(ExecutorPlacementProperty.TRANSIENT, 1, 0);
-    final ExecutorRepresenter b2 =
-        new ExecutorRepresenter("b2", storageSpec, mockMsgSender, activeContext, serializationExecutorService);
-    final ExecutorRepresenter b1 =
-        new ExecutorRepresenter("b1", storageSpec, mockMsgSender, activeContext, serializationExecutorService);
-
-    executorRepresenterMap.put(a1.getExecutorId(), a1);
-    executorRepresenterMap.put(a2.getExecutorId(), a2);
-    executorRepresenterMap.put(a3.getExecutorId(), a3);
-    executorRepresenterMap.put(b1.getExecutorId(), b1);
-    executorRepresenterMap.put(b2.getExecutorId(), b2);
+    final ResourceSpecification computeSpec =
+        new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, EXECUTOR_CAPACITY, 0);
+    final Function<String, ExecutorRepresenter> computeSpecExecutorRepresenterGenerator = executorId ->
+        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serializationExecutorService,
+            executorId);
+    final ExecutorRepresenter a3 = computeSpecExecutorRepresenterGenerator.apply("a3");
+    final ExecutorRepresenter a2 = computeSpecExecutorRepresenterGenerator.apply("a2");
+    final ExecutorRepresenter a1 = computeSpecExecutorRepresenterGenerator.apply("a1");
+
+    final ResourceSpecification storageSpec =
+        new ResourceSpecification(ExecutorPlacementProperty.TRANSIENT, EXECUTOR_CAPACITY, 0);
+    final Function<String, ExecutorRepresenter> storageSpecExecutorRepresenterGenerator = executorId ->
+        new ExecutorRepresenter(executorId, storageSpec, mockMsgSender, activeContext, serializationExecutorService,
+            executorId);
+    final ExecutorRepresenter b2 = storageSpecExecutorRepresenterGenerator.apply("b2");
+    final ExecutorRepresenter b1 = storageSpecExecutorRepresenterGenerator.apply("b1");
 
     // Add compute nodes
-    scheduler.onExecutorAdded(a1.getExecutorId());
-    scheduler.onExecutorAdded(a2.getExecutorId());
-    scheduler.onExecutorAdded(a3.getExecutorId());
+    scheduler.onExecutorAdded(a1);
+    scheduler.onExecutorAdded(a2);
+    scheduler.onExecutorAdded(a3);
 
     // Add storage nodes
-    scheduler.onExecutorAdded(b1.getExecutorId());
-    scheduler.onExecutorAdded(b2.getExecutorId());
+    scheduler.onExecutorAdded(b1);
+    scheduler.onExecutorAdded(b2);
 
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    injector.bindVolatileParameter(JobConf.DAGDirectory.class, "");
     physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
   }
 
@@ -154,48 +151,8 @@ public final class BatchSingleJobSchedulerTest {
    */
   @Test(timeout=10000)
   public void testPull() throws Exception {
-    // Build DAG
-    final Transform t = new EmptyComponents.EmptyTransform("empty");
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
-    v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(3));
-    v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v3);
-
-    final IRVertex v4 = new OperatorVertex(t);
-    v4.setProperty(ParallelismProperty.of(2));
-    v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v4);
-
-    final IRVertex v5 = new OperatorVertex(new DoTransform(null, null));
-    v5.setProperty(ParallelismProperty.of(2));
-    v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT));
-    irDAGBuilder.addVertex(v5);
-
-    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
-
-    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
-
-    final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e4);
-
-    final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v5, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e5);
-
     final DAG<IRVertex, IREdge> pullIRDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
         new TestPolicy(), "");
-
     scheduleAndCheckJobTermination(pullIRDAG);
   }
 
@@ -205,50 +162,53 @@ public final class BatchSingleJobSchedulerTest {
    */
   @Test(timeout=10000)
   public void testPush() throws Exception {
-    // Build DAG
+    final DAG<IRVertex, IREdge> pushIRDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
+        new TestPolicy(true), "");
+    scheduleAndCheckJobTermination(pushIRDAG);
+  }
+
+  private DAGBuilder<IRVertex, IREdge> initializeDAGBuilder() {
+    final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+
     final Transform t = new EmptyComponents.EmptyTransform("empty");
     final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
+    v1.setProperty(ParallelismProperty.of(1));
     v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v1);
+    dagBuilder.addVertex(v1);
 
     final IRVertex v2 = new OperatorVertex(t);
     v2.setProperty(ParallelismProperty.of(2));
     v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v2);
+    dagBuilder.addVertex(v2);
 
     final IRVertex v3 = new OperatorVertex(t);
     v3.setProperty(ParallelismProperty.of(3));
     v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v3);
+    dagBuilder.addVertex(v3);
 
     final IRVertex v4 = new OperatorVertex(t);
-    v4.setProperty(ParallelismProperty.of(2));
+    v4.setProperty(ParallelismProperty.of(4));
     v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v4);
+    dagBuilder.addVertex(v4);
 
     final IRVertex v5 = new OperatorVertex(new DoTransform(null, null));
-    v5.setProperty(ParallelismProperty.of(2));
+    v5.setProperty(ParallelismProperty.of(5));
     v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT));
-    irDAGBuilder.addVertex(v5);
+    dagBuilder.addVertex(v5);
 
     final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
+    dagBuilder.connectVertices(e1);
 
     final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
+    dagBuilder.connectVertices(e2);
 
     final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e4);
+    dagBuilder.connectVertices(e4);
 
     final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v5, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e5);
+    dagBuilder.connectVertices(e5);
 
-    final DAG<IRVertex, IREdge> pushIRDAG =
-        CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(true), "");
-
-    scheduleAndCheckJobTermination(pushIRDAG);
+    return dagBuilder;
   }
 
   private void scheduleAndCheckJobTermination(final DAG<IRVertex, IREdge> irDAG) throws InjectionException {
@@ -263,29 +223,41 @@ public final class BatchSingleJobSchedulerTest {
     // b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete".
     for (int i = 0; i < getNumScheduleGroups(irDAG); i++) {
       final int scheduleGroupIdx = i;
-
-      final List<PhysicalStage> scheduleGroupStages = physicalDAG.filterVertices(physicalStage ->
-          physicalStage.getScheduleGroupIndex() == scheduleGroupIdx);
+      final List<PhysicalStage> stages = filterStagesWithAScheduleGroupIndex(physicalDAG, scheduleGroupIdx);
 
       LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx);
-      scheduleGroupStages.forEach(physicalStage -> {
+      stages.forEach(physicalStage -> {
         while (jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
             != StageState.State.EXECUTING) {
 
         }
       });
 
-      scheduleGroupStages.forEach(physicalStage ->
-          RuntimeTestUtil.sendStageCompletionEventToScheduler(
-              jobStateManager, scheduler, containerManager, physicalStage, MAGIC_SCHEDULE_ATTEMPT_INDEX));
+      stages.forEach(physicalStage -> {
+        RuntimeTestUtil.completeStage(
+            jobStateManager, scheduler, executorRegistry, physicalStage, MAGIC_SCHEDULE_ATTEMPT_INDEX);
+      });
     }
 
     LOG.debug("Waiting for job termination after sending stage completion events");
     while (!jobStateManager.checkJobTermination()) {
-
     }
     assertTrue(jobStateManager.checkJobTermination());
-    RuntimeTestUtil.cleanup();
+  }
+
+  private List<PhysicalStage> filterStagesWithAScheduleGroupIndex(
+      final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG, final int scheduleGroupIndex) {
+    final Set<PhysicalStage> stageSet = new HashSet<>(physicalDAG.filterVertices(
+        physicalStage -> physicalStage.getScheduleGroupIndex() == scheduleGroupIndex));
+
+    // Return the filtered vertices as a sorted list
+    final List<PhysicalStage> sortedStages = new ArrayList<>(stageSet.size());
+    physicalDAG.topologicalDo(stage -> {
+      if (stageSet.contains(stage)) {
+        sortedStages.add(stage);
+      }
+    });
+    return sortedStages;
   }
 
   private int getNumScheduleGroups(final DAG<IRVertex, IREdge> irDAG) {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
index cc6cfef..9d067b0 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
@@ -29,6 +29,7 @@ import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.tests.runtime.RuntimeTestUtil;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
@@ -38,7 +39,6 @@ import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
 import edu.snu.nemo.runtime.master.scheduler.*;
@@ -46,6 +46,7 @@ import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
 import org.apache.reef.driver.context.ActiveContext;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -58,6 +59,7 @@ import org.slf4j.LoggerFactory;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Function;
 
 import static edu.snu.nemo.runtime.common.state.StageState.State.COMPLETE;
 import static edu.snu.nemo.runtime.common.state.StageState.State.EXECUTING;
@@ -65,13 +67,12 @@ import static junit.framework.TestCase.assertFalse;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests fault tolerance.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ContainerManager.class, BlockManagerMaster.class, SchedulerRunner.class,
+@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class,
     PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class})
 public final class FaultToleranceTest {
   private static final Logger LOG = LoggerFactory.getLogger(FaultToleranceTest.class.getName());
@@ -79,7 +80,7 @@ public final class FaultToleranceTest {
   private Scheduler scheduler;
   private SchedulingPolicy schedulingPolicy;
   private SchedulerRunner schedulerRunner;
-  private ContainerManager containerManager;
+  private ExecutorRegistry executorRegistry;
 
   private MetricMessageHandler metricMessageHandler;
   private PendingTaskGroupQueue pendingTaskGroupQueue;
@@ -95,7 +96,6 @@ public final class FaultToleranceTest {
 
   @Before
   public void setUp() throws Exception {
-    RuntimeTestUtil.initialize();
     irDAGBuilder = new DAGBuilder<>();
 
     metricMessageHandler = mock(MetricMessageHandler.class);
@@ -107,15 +107,12 @@ public final class FaultToleranceTest {
     physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class);
   }
 
-  private void setUpExecutors(final Map<String, ExecutorRepresenter> executorRepresenterMap,
-                              final Map<String, ExecutorRepresenter> failedexecutorRepresenterMap,
-                              final boolean useMockSchedulerRunner) {
-    containerManager = mock(ContainerManager.class);
-    when(containerManager.getExecutorRepresenterMap()).thenReturn(executorRepresenterMap);
-    when(containerManager.getFailedExecutorRepresenterMap()).thenReturn(failedexecutorRepresenterMap);
+  private void setUpExecutors(final Collection<ExecutorRepresenter> executors,
+                              final boolean useMockSchedulerRunner) throws InjectionException {
+    executorRegistry = Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
 
     pendingTaskGroupQueue = new SingleJobTaskGroupQueue();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(containerManager, TEST_TIMEOUT_MS);
+    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry, TEST_TIMEOUT_MS);
 
     if (useMockSchedulerRunner) {
       schedulerRunner = mock(SchedulerRunner.class);
@@ -127,7 +124,9 @@ public final class FaultToleranceTest {
             blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler);
 
     // Add nodes
-    executorRepresenterMap.keySet().forEach(executorId -> scheduler.onExecutorAdded(executorId));
+    for (final ExecutorRepresenter executor : executors) {
+      scheduler.onExecutorAdded(executor);
+    }
   }
 
   private PhysicalPlan buildPlan() throws Exception {
@@ -186,23 +185,18 @@ public final class FaultToleranceTest {
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
 
     final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final ExecutorRepresenter a3 =
-        new ExecutorRepresenter("a3", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a2 =
-        new ExecutorRepresenter("a2", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a1 =
-        new ExecutorRepresenter("a1", computeSpec, mockMsgSender, activeContext, serExecutorService);
-
-    final Map<String, ExecutorRepresenter> executorMap = new HashMap<>();
-    executorMap.put("a1", a1);
-    executorMap.put("a2", a2);
-    executorMap.put("a3", a3);
-
-    final Map<String, ExecutorRepresenter> failedExecutorMap = new HashMap<>();
-    failedExecutorMap.put("a2", a2);
-    failedExecutorMap.put("a3", a2);
-
-    setUpExecutors(executorMap, failedExecutorMap, true);
+    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
+        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
+    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
+    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
+    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
+
+    final List<ExecutorRepresenter> executors = new ArrayList<>();
+    executors.add(a1);
+    executors.add(a2);
+    executors.add(a3);
+
+    setUpExecutors(executors, true);
     final PhysicalPlan plan = buildPlan();
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
@@ -217,7 +211,7 @@ public final class FaultToleranceTest {
         RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupQueue.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, containerManager,
+          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
@@ -234,7 +228,7 @@ public final class FaultToleranceTest {
         RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupQueue.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, containerManager,
+          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else {
         // There are 2 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 2.
@@ -257,8 +251,6 @@ public final class FaultToleranceTest {
         }
       }
     }
-
-    RuntimeTestUtil.cleanup();
   }
 
   /**
@@ -270,18 +262,17 @@ public final class FaultToleranceTest {
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
 
     final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final ExecutorRepresenter a3 =
-        new ExecutorRepresenter("a3", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a2 =
-        new ExecutorRepresenter("a2", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a1 =
-        new ExecutorRepresenter("a1", computeSpec, mockMsgSender, activeContext, serExecutorService);
-
-    final Map<String, ExecutorRepresenter> executorMap = new HashMap<>();
-    executorMap.put("a1", a1);
-    executorMap.put("a2", a2);
-    executorMap.put("a3", a3);
-    setUpExecutors(executorMap, Collections.emptyMap(), true);
+    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
+        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
+    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
+    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
+    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
+
+    final List<ExecutorRepresenter> executors = new ArrayList<>();
+    executors.add(a1);
+    executors.add(a2);
+    executors.add(a3);
+    setUpExecutors(executors, true);
 
     final PhysicalPlan plan = buildPlan();
     final JobStateManager jobStateManager =
@@ -297,14 +288,14 @@ public final class FaultToleranceTest {
         RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupQueue.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, containerManager,
+          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
         RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupQueue.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, containerManager,
+          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
               TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
 
@@ -320,8 +311,6 @@ public final class FaultToleranceTest {
         });
       }
     }
-
-    RuntimeTestUtil.cleanup();
   }
 
   /**
@@ -333,18 +322,17 @@ public final class FaultToleranceTest {
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
 
     final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final ExecutorRepresenter a3 =
-        new ExecutorRepresenter("a3", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a2 =
-        new ExecutorRepresenter("a2", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a1 =
-        new ExecutorRepresenter("a1", computeSpec, mockMsgSender, activeContext, serExecutorService);
-
-    final Map<String, ExecutorRepresenter> executorMap = new HashMap<>();
-    executorMap.put("a1", a1);
-    executorMap.put("a2", a2);
-    executorMap.put("a3", a3);
-    setUpExecutors(executorMap, Collections.emptyMap(), true);
+    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
+        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
+    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
+    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
+    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
+
+    final List<ExecutorRepresenter> executors = new ArrayList<>();
+    executors.add(a1);
+    executors.add(a2);
+    executors.add(a3);
+    setUpExecutors(executors, true);
 
     final PhysicalPlan plan = buildPlan();
     final JobStateManager jobStateManager =
@@ -360,14 +348,14 @@ public final class FaultToleranceTest {
         RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
         assertTrue(pendingTaskGroupQueue.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, containerManager,
+          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
         RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
 
         stage.getTaskGroupIds().forEach(taskGroupId ->
-          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, containerManager,
+          RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
               TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
 
@@ -382,8 +370,6 @@ public final class FaultToleranceTest {
         });
       }
     }
-
-    RuntimeTestUtil.cleanup();
   }
 
   /**
@@ -395,23 +381,19 @@ public final class FaultToleranceTest {
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
 
     final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0);
-    final ExecutorRepresenter a3 =
-        new ExecutorRepresenter("a3", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a2 =
-        new ExecutorRepresenter("a2", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a1 =
-        new ExecutorRepresenter("a1", computeSpec, mockMsgSender, activeContext, serExecutorService);
-
-    final Map<String, ExecutorRepresenter> executorMap = new HashMap<>();
-    executorMap.put("a1", a1);
-    executorMap.put("a2", a2);
-    executorMap.put("a3", a3);
-
-    final Map<String, ExecutorRepresenter> failedExecutorMap = new HashMap<>();
-    failedExecutorMap.put("a2", a2);
-    failedExecutorMap.put("a3", a2);
-
-    setUpExecutors(executorMap, failedExecutorMap, false);
+    final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId ->
+        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
+    final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3");
+    final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2");
+    final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1");
+
+    final List<ExecutorRepresenter> executors = new ArrayList<>();
+    executors.add(a1);
+    executors.add(a2);
+    executors.add(a3);
+
+    setUpExecutors(executors, false);
+
     final PhysicalPlan plan = buildPlan();
     final JobStateManager jobStateManager =
         new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT);
@@ -427,16 +409,14 @@ public final class FaultToleranceTest {
         final Set<String> a3RunningTaskGroups = new HashSet<>(a3.getRunningTaskGroups());
 
         a1RunningTaskGroups.forEach(taskGroupId ->
-            RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, containerManager,
+            RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
                 taskGroupId, TaskGroupState.State.COMPLETE, 1));
 
         a3RunningTaskGroups.forEach(taskGroupId ->
-            RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, containerManager,
+            RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
                 taskGroupId, TaskGroupState.State.COMPLETE, 1));
       }
     }
     assertTrue(jobStateManager.checkJobTermination());
-
-    RuntimeTestUtil.cleanup();
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index dd9e524..72b19b7 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -21,12 +21,14 @@ import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
 import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.reef.driver.context.ActiveContext;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -37,6 +39,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -48,58 +51,74 @@ import static org.mockito.Mockito.*;
  * Tests {@link RoundRobinSchedulingPolicy}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ContainerManager.class, JobStateManager.class})
+@PrepareForTest(JobStateManager.class)
 public final class RoundRobinSchedulingPolicyTest {
-  private static final int TIMEOUT_MS = 1000;
+  private static final int TIMEOUT_MS = 2000;
 
   private SchedulingPolicy schedulingPolicy;
-  private ContainerManager containerManager = mock(ContainerManager.class);
+  private ExecutorRegistry executorRegistry;
   private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
   private JobStateManager jobStateManager = mock(JobStateManager.class);
 
   // This schedule index will make sure that task group events are not ignored
   private static final int MAGIC_SCHEDULE_ATTEMPT_INDEX = Integer.MAX_VALUE;
+  private static final String RESERVED_EXECUTOR_ID = "RESERVED";
 
   @Before
-  public void setUp() {
-    final Map<String, ExecutorRepresenter> executorRepresenterMap = new HashMap<>();
-    when(containerManager.getExecutorRepresenterMap()).thenReturn(executorRepresenterMap);
-    when(containerManager.getFailedExecutorRepresenterMap()).thenReturn(executorRepresenterMap);
+  public void setUp() throws InjectionException {
+    executorRegistry = Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
 
-    schedulingPolicy = new RoundRobinSchedulingPolicy(containerManager, TIMEOUT_MS);
+    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry, TIMEOUT_MS);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
 
     final ExecutorService serExecutorService = Executors.newSingleThreadExecutor();
     final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 1, 0);
-    final ExecutorRepresenter a3 =
-        new ExecutorRepresenter("a3", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a2 =
-        new ExecutorRepresenter("a2", computeSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter a1 =
-        new ExecutorRepresenter("a1", computeSpec, mockMsgSender, activeContext, serExecutorService);
+    final Function<String, ExecutorRepresenter> computeSpecExecutorRepresenterGenerator = executorId ->
+        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
+    final ExecutorRepresenter a3 = computeSpecExecutorRepresenterGenerator.apply("a3");
+    final ExecutorRepresenter a2 = computeSpecExecutorRepresenterGenerator.apply("a2");
+    final ExecutorRepresenter a1 = computeSpecExecutorRepresenterGenerator.apply("a1");
 
     final ResourceSpecification storageSpec = new ResourceSpecification(ExecutorPlacementProperty.TRANSIENT, 1, 0);
-    final ExecutorRepresenter b2 =
-        new ExecutorRepresenter("b2", storageSpec, mockMsgSender, activeContext, serExecutorService);
-    final ExecutorRepresenter b1 =
-        new ExecutorRepresenter("b1", storageSpec, mockMsgSender, activeContext, serExecutorService);
+    final Function<String, ExecutorRepresenter> storageSpecExecutorRepresenterGenerator = executorId ->
+        new ExecutorRepresenter(executorId, storageSpec, mockMsgSender, activeContext, serExecutorService, executorId);
+    final ExecutorRepresenter b2 = storageSpecExecutorRepresenterGenerator.apply("b2");
+    final ExecutorRepresenter b1 = storageSpecExecutorRepresenterGenerator.apply("b1");
 
-    executorRepresenterMap.put(a1.getExecutorId(), a1);
-    executorRepresenterMap.put(a2.getExecutorId(), a2);
-    executorRepresenterMap.put(a3.getExecutorId(), a3);
-    executorRepresenterMap.put(b1.getExecutorId(), b1);
-    executorRepresenterMap.put(b2.getExecutorId(), b2);
+    final ResourceSpecification reservedSpec = new ResourceSpecification(ExecutorPlacementProperty.RESERVED, 1, 0);
+    final Function<String, ExecutorRepresenter> reservedSpecExecutorRepresenterGenerator = executorId ->
+        new ExecutorRepresenter(executorId, reservedSpec, mockMsgSender, activeContext, serExecutorService, executorId);
+    final ExecutorRepresenter r = reservedSpecExecutorRepresenterGenerator.apply(RESERVED_EXECUTOR_ID);
 
     // Add compute nodes
-    schedulingPolicy.onExecutorAdded(a3.getExecutorId());
-    schedulingPolicy.onExecutorAdded(a2.getExecutorId());
-    schedulingPolicy.onExecutorAdded(a1.getExecutorId());
+    schedulingPolicy.onExecutorAdded(a3);
+    schedulingPolicy.onExecutorAdded(a2);
+    schedulingPolicy.onExecutorAdded(a1);
 
     // Add storage nodes
-    schedulingPolicy.onExecutorAdded(b2.getExecutorId());
-    schedulingPolicy.onExecutorAdded(b1.getExecutorId());
+    schedulingPolicy.onExecutorAdded(b2);
+    schedulingPolicy.onExecutorAdded(b1);
+
+    // Add reserved node
+    schedulingPolicy.onExecutorAdded(r);
+  }
+
+  @Test
+  public void testWakeupFromAwaitByTaskGroupCompletion() {
+    final Timer timer = new Timer();
+    final List<ScheduledTaskGroup> scheduledTaskGroups =
+        convertToScheduledTaskGroups(5, new byte[0], "Stage", ExecutorPlacementProperty.RESERVED);
+    assertTrue(schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(0), jobStateManager));
+    timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        schedulingPolicy.onTaskGroupExecutionComplete(RESERVED_EXECUTOR_ID,
+            scheduledTaskGroups.get(0).getTaskGroupId());
+      }
+    }, 1000);
+    assertTrue(schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(1), jobStateManager));
   }
 
   @Test
@@ -109,7 +128,7 @@ public final class RoundRobinSchedulingPolicyTest {
 
   @Test
   public void testNoneContainerType() {
-    final int slots = 5;
+    final int slots = 6;
     final List<ScheduledTaskGroup> scheduledTaskGroups =
         convertToScheduledTaskGroups(slots + 1, new byte[0], "Stage A", ExecutorPlacementProperty.NONE);
 
@@ -174,12 +193,10 @@ public final class RoundRobinSchedulingPolicyTest {
     b2 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsB.get(2), jobStateManager);
     assertTrue(b2);
 
-    containerManager.onExecutorRemoved("b1");
     Set<String> executingTaskGroups = schedulingPolicy.onExecutorRemoved("b1");
     assertEquals(1, executingTaskGroups.size());
     assertEquals(scheduledTaskGroupsB.get(2).getTaskGroupId(), executingTaskGroups.iterator().next());
 
-    containerManager.onExecutorRemoved("a1");
     executingTaskGroups = schedulingPolicy.onExecutorRemoved("a1");
     assertEquals(1, executingTaskGroups.size());
     assertEquals(scheduledTaskGroupsA.get(3).getTaskGroupId(), executingTaskGroups.iterator().next());
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
new file mode 100644
index 0000000..baf18cf
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -0,0 +1,492 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
+import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
+import edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+
+/**
+ * Test cases for
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({JobStateManager.class, ExecutorRepresenter.class, RoundRobinSchedulingPolicy.class,
+    ScheduledTaskGroup.class, Readable.class})
+public final class SourceLocationAwareSchedulingPolicyTest {
+  private static final String SITE_0 = "SEOUL";
+  private static final String SITE_1 = "JINJU";
+  private static final String SITE_2 = "BUSAN";
+
+  private SourceLocationAwareSchedulingPolicy sourceLocationAware;
+  private SpiedSchedulingPolicyWrapper<RoundRobinSchedulingPolicy> roundRobin;
+  private MockJobStateManagerWrapper jobStateManager;
+
+  private void setup(final int schedulerTimeoutMs) {
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    jobStateManager = new MockJobStateManagerWrapper();
+
+    final ExecutorRegistry executorRegistry = new ExecutorRegistry();
+    final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy =
+        new RoundRobinSchedulingPolicy(executorRegistry, schedulerTimeoutMs);
+    roundRobin = new SpiedSchedulingPolicyWrapper(roundRobinSchedulingPolicy, jobStateManager.get());
+
+    injector.bindVolatileInstance(RoundRobinSchedulingPolicy.class, roundRobin.get());
+    injector.bindVolatileInstance(JobStateManager.class, jobStateManager.get());
+    injector.bindVolatileInstance(ExecutorRegistry.class, executorRegistry);
+    try {
+      sourceLocationAware = injector.getInstance(SourceLocationAwareSchedulingPolicy.class);
+    } catch (final InjectionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @After
+  public void teardown() {
+    // All expectations should be resolved at this time.
+    roundRobin.ensureNoUnresolvedExpectation();
+  }
+
+  /**
+   * {@link SourceLocationAwareSchedulingPolicy} should delegate scheduling decision when the
+   * {@link ScheduledTaskGroup} does not have any source tasks.
+   */
+  @Test
+  public void testRoundRobinSchedulerFallback() {
+    setup(500);
+
+    // Prepare test scenario
+    final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withoutReadables(ExecutorPlacementProperty.NONE);
+    final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithoutSourceLocations(2,
+        ExecutorPlacementProperty.NONE);
+    final ScheduledTaskGroup tg2 = CreateScheduledTaskGroup.withReadablesWhichThrowException(5,
+        ExecutorPlacementProperty.NONE);
+    addExecutor(new MockExecutorRepresenterWrapper(SITE_0, ExecutorPlacementProperty.NONE, 1));
+    addExecutor(new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1));
+
+    // Trying to schedule tg0: expected to fall back to RoundRobinSchedulingPolicy
+    roundRobin.expectSchedulingRequest(tg0);
+    // ...and scheduling attempt must success
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, jobStateManager.get()));
+    // ...thus the TaskGroup should be running
+    jobStateManager.assertTaskGroupState(tg0.getTaskGroupId(), TaskGroupState.State.EXECUTING);
+
+    // Trying to schedule tg1: expected to fall back to RoundRobinSchedulingPolicy
+    roundRobin.expectSchedulingRequest(tg1);
+    // ...and scheduling attempt must success
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, jobStateManager.get()));
+    // ...thus the TaskGroup should be running
+    jobStateManager.assertTaskGroupState(tg1.getTaskGroupId(), TaskGroupState.State.EXECUTING);
+
+    // Trying to schedule tg2: expected to fall back to RoundRobinSchedulingPolicy
+    roundRobin.expectSchedulingRequest(tg2);
+    // ...and scheduling attempt must success
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg2, jobStateManager.get()));
+    // ...thus the TaskGroup should be running
+    jobStateManager.assertTaskGroupState(tg2.getTaskGroupId(), TaskGroupState.State.EXECUTING);
+  }
+
+  /**
+   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ScheduledTaskGroup} when
+   * there are no executors in appropriate location(s).
+   */
+  @Test
+  public void testSourceLocationAwareSchedulingNotAvailable() {
+    setup(500);
+
+    // Prepare test scenario
+    final ScheduledTaskGroup tg = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_0)), ExecutorPlacementProperty.NONE);
+    final MockExecutorRepresenterWrapper e0 = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1));
+    final MockExecutorRepresenterWrapper e1 = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1));
+
+    // Attempt to schedule tg must fail (fallback to round robin policy is not expected)
+    assertFalse(sourceLocationAware.scheduleTaskGroup(tg, jobStateManager.get()));
+    // Thus executors should have no running TaskGroups at all
+    e0.assertScheduledTaskGroups(Collections.emptyList());
+    e1.assertScheduledTaskGroups(Collections.emptyList());
+  }
+
+  private static final String CONTAINER_TYPE_A = "A";
+
+  /**
+   * {@link SourceLocationAwareSchedulingPolicy} should schedule TG to one of the executors with appropriate
+   * location and container type.
+   */
+  @Test
+  public void testSourceLocationAwareSchedulingWithContainerType() {
+    setup(500);
+
+    // Prepare test scenario
+    final ScheduledTaskGroup tg = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
+    final MockExecutorRepresenterWrapper e0 = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_0, ExecutorPlacementProperty.NONE, 1));
+    final MockExecutorRepresenterWrapper e1 = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1));
+    final MockExecutorRepresenterWrapper e2 = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_2, ExecutorPlacementProperty.NONE, 1));
+    final MockExecutorRepresenterWrapper e3 = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_0, CONTAINER_TYPE_A, 1));
+    final MockExecutorRepresenterWrapper e4 = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 1));
+    final MockExecutorRepresenterWrapper e5 = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_2, CONTAINER_TYPE_A, 1));
+
+    // Attempt to schedule tg must success (fallback to round robin is not expected)
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg, jobStateManager.get()));
+    // tg must run on e3
+    e0.assertScheduledTaskGroups(Collections.emptyList());
+    e1.assertScheduledTaskGroups(Collections.emptyList());
+    e2.assertScheduledTaskGroups(Collections.emptyList());
+    e3.assertScheduledTaskGroups(Collections.singletonList(tg));
+    e4.assertScheduledTaskGroups(Collections.emptyList());
+    e5.assertScheduledTaskGroups(Collections.emptyList());
+  }
+
+  /**
+   * {@link SourceLocationAwareSchedulingPolicy} should not schedule more TGs than executor capacity allows.
+   */
+  @Test
+  public void testSourceLocationAwareSchedulingDoesNotOverSchedule() {
+    setup(500);
+
+    // Prepare test scenario
+    final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
+    final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
+    final ScheduledTaskGroup tg2 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
+    final ScheduledTaskGroup tg3 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
+    final MockExecutorRepresenterWrapper e = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_0, CONTAINER_TYPE_A, 3));
+
+    // Attempt to schedule TG must success (fallback to round robin is not expected)
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, jobStateManager.get()));
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, jobStateManager.get()));
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg2, jobStateManager.get()));
+
+    // This must fail
+    assertFalse(sourceLocationAware.scheduleTaskGroup(tg3, jobStateManager.get()));
+
+    // Expected executor status
+    e.assertScheduledTaskGroups(Arrays.asList(tg0, tg1, tg2));
+  }
+
+  /**
+   * {@link SourceLocationAwareSchedulingPolicy} should properly schedule TGs with multiple source locations.
+   */
+  @Test
+  public void testSourceLocationAwareSchedulingWithMultiSource() {
+    setup(500);
+
+    // Prepare test scenario
+    final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_1)), CONTAINER_TYPE_A);
+    final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)), CONTAINER_TYPE_A);
+    final ScheduledTaskGroup tg2 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Arrays.asList(Collections.singletonList(SITE_0), Collections.singletonList(SITE_1),
+            Arrays.asList(SITE_1, SITE_2)), CONTAINER_TYPE_A);
+    final ScheduledTaskGroup tg3 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Arrays.asList(Collections.singletonList(SITE_1), Collections.singletonList(SITE_0),
+            Arrays.asList(SITE_0, SITE_2)), CONTAINER_TYPE_A);
+    final MockExecutorRepresenterWrapper e = addExecutor(
+        new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 4));
+
+    // Attempt to schedule TG must success (fallback to round robin is not expected)
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, jobStateManager.get()));
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, jobStateManager.get()));
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg2, jobStateManager.get()));
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg3, jobStateManager.get()));
+
+    // Expected executor status
+    e.assertScheduledTaskGroups(Arrays.asList(tg0, tg1, tg2, tg3));
+  }
+
+  /**
+   * If there are no appropriate executors available, {@link SourceLocationAwareSchedulingPolicy} should await
+   * for the given amount of time, immediately waking up on executor addition.
+   */
+  @Test
+  public void testWakeupFromAwaitByExecutorAddition() {
+    // We need timeout value which is long enough.
+    setup(20000);
+    final Timer timer = new Timer();
+
+    // Prepare test scenario
+    final ScheduledTaskGroup tg = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_1)), CONTAINER_TYPE_A);
+    final MockExecutorRepresenterWrapper e = new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 1);
+
+    // The executor will be available in 1000ms.
+    timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        addExecutor(e);
+      }
+    }, 1000);
+    // Attempt to schedule TG must success
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg, jobStateManager.get()));
+  }
+
+  /**
+   * If there are no appropriate executors available, {@link SourceLocationAwareSchedulingPolicy} should await
+   * for the given amount of time, immediately waking up on TaskGroup completion.
+   */
+  @Test
+  public void testWakeupFromAwaitByTaskGroupCompletion() {
+    // We need timeout value which is long enough.
+    setup(20000);
+    final Timer timer = new Timer();
+
+    // Prepare test scenario
+    final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_1)), CONTAINER_TYPE_A);
+    final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+        Collections.singletonList(Collections.singletonList(SITE_1)), CONTAINER_TYPE_A);
+    final MockExecutorRepresenterWrapper e = addExecutor(new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 1));
+
+    // Attempt to schedule TG must success
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, jobStateManager.get()));
+    // The TaskGroup will be completed in 1000ms.
+    timer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        sourceLocationAware.onTaskGroupExecutionComplete(e.get().getExecutorId(), tg0.getTaskGroupId());
+      }
+    }, 1000);
+    // Attempt to schedule TG must success
+    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, jobStateManager.get()));
+  }
+
+  private MockExecutorRepresenterWrapper addExecutor(final MockExecutorRepresenterWrapper executor) {
+    sourceLocationAware.onExecutorAdded(executor.get());
+    return executor;
+  }
+
+  /**
+   * Utility for creating {@link ScheduledTaskGroup}.
+   */
+  private static final class CreateScheduledTaskGroup {
+    private static final AtomicInteger taskGroupIndex = new AtomicInteger(0);
+    private static final AtomicInteger taskIndex = new AtomicInteger(0);
+
+    private static ScheduledTaskGroup doCreate(final Collection<Readable> readables, final String containerType) {
+      final ScheduledTaskGroup mockInstance = mock(ScheduledTaskGroup.class);
+      final Map<String, Readable> readableMap = new HashMap<>();
+      readables.forEach(readable -> readableMap.put(String.format("TASK-%d", taskIndex.getAndIncrement()),
+          readable));
+      when(mockInstance.getTaskGroupId()).thenReturn(String.format("TG-%d", taskGroupIndex.getAndIncrement()));
+      when(mockInstance.getLogicalTaskIdToReadable()).thenReturn(readableMap);
+      when(mockInstance.getContainerType()).thenReturn(containerType);
+      return mockInstance;
+    }
+
+    static ScheduledTaskGroup withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation,
+                                                               final String containerType) {
+      try {
+        final List<Readable> readables = new ArrayList<>();
+        for (final List<String> locations : sourceLocation) {
+          final Readable readable = mock(Readable.class);
+          when(readable.getLocations()).thenReturn(locations);
+          readables.add(readable);
+        }
+        return doCreate(readables, containerType);
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    static ScheduledTaskGroup withReadablesWithoutSourceLocations(final int numReadables,
+                                                                  final String containerType) {
+      try {
+        final List<Readable> readables = new ArrayList<>();
+        for (int i = 0; i < numReadables; i++) {
+          final Readable readable = mock(Readable.class);
+          when(readable.getLocations()).thenReturn(Collections.emptyList());
+          readables.add(readable);
+        }
+        return doCreate(readables, containerType);
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    static ScheduledTaskGroup withReadablesWhichThrowException(final int numReadables,
+                                                               final String containerType) {
+      try {
+        final List<Readable> readables = new ArrayList<>();
+        for (int i = 0; i < numReadables; i++) {
+          final Readable readable = mock(Readable.class);
+          when(readable.getLocations()).thenThrow(new Exception("EXCEPTION"));
+          readables.add(readable);
+        }
+        return doCreate(readables, containerType);
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    static ScheduledTaskGroup withoutReadables(final String containerType) {
+      return doCreate(Collections.emptyList(), containerType);
+    }
+  }
+
+  /**
+   * Wrapper for mock {@link ExecutorRepresenter}.
+   */
+  private static final class MockExecutorRepresenterWrapper {
+    private static final AtomicInteger executorIndex = new AtomicInteger(0);
+
+    private final ExecutorRepresenter mockInstance;
+    private final List<ScheduledTaskGroup> scheduledTaskGroups = new ArrayList<>();
+
+    MockExecutorRepresenterWrapper(final String nodeName, final String containerType, final int capacity) {
+      mockInstance = mock(ExecutorRepresenter.class);
+      doAnswer(invocationOnMock -> {
+        final ScheduledTaskGroup scheduledTaskGroup = invocationOnMock.getArgument(0);
+        scheduledTaskGroups.add(scheduledTaskGroup);
+        return null;
+      }).when(mockInstance).onTaskGroupScheduled(any(ScheduledTaskGroup.class));
+      doAnswer(invocationOnMock -> {
+        final String taskGroupId = invocationOnMock.getArgument(0);
+        scheduledTaskGroups.removeIf(scheduledTaskGroup -> scheduledTaskGroup.getTaskGroupId().equals(taskGroupId));
+        return null;
+      }).when(mockInstance).onTaskGroupExecutionComplete(anyString());
+      when(mockInstance.getExecutorId()).thenReturn(String.format("EXECUTOR-%d", executorIndex.getAndIncrement()));
+      when(mockInstance.getNodeName()).thenReturn(nodeName);
+      when(mockInstance.getContainerType()).thenReturn(containerType);
+      doAnswer(invocationOnMock ->
+          scheduledTaskGroups.stream().map(ScheduledTaskGroup::getTaskGroupId).collect(Collectors.toSet()))
+          .when(mockInstance).getRunningTaskGroups();
+      when(mockInstance.getExecutorCapacity()).thenReturn(capacity);
+    }
+
+    void assertScheduledTaskGroups(final List<ScheduledTaskGroup> expected) {
+      assertEquals(expected, scheduledTaskGroups);
+    }
+
+    ExecutorRepresenter get() {
+      return mockInstance;
+    }
+  }
+
+  /**
+   * Wrapper for spied {@link SchedulingPolicy}.
+   * @param <T> the class of the spied instance
+   */
+  private static final class SpiedSchedulingPolicyWrapper<T extends SchedulingPolicy> {
+    private final T spiedInstance;
+    private ScheduledTaskGroup expectedArgument = null;
+
+    SpiedSchedulingPolicyWrapper(final T schedulingPolicy, final JobStateManager jobStateManager) {
+      spiedInstance = spy(schedulingPolicy);
+      doAnswer(invocationOnMock -> {
+        final ScheduledTaskGroup scheduledTaskGroup = invocationOnMock.getArgument(0);
+        assertEquals(expectedArgument, scheduledTaskGroup);
+        expectedArgument = null;
+        jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), TaskGroupState.State.EXECUTING);
+        return true;
+      }).when(spiedInstance).scheduleTaskGroup(any(ScheduledTaskGroup.class), any());
+    }
+
+    /**
+     * Sets expected {@link SchedulingPolicy#scheduleTaskGroup(ScheduledTaskGroup, JobStateManager)} invocation
+     * on this spied object.
+     * @param scheduledTaskGroup expected parameter for the task group to schedule
+     */
+    void expectSchedulingRequest(final ScheduledTaskGroup scheduledTaskGroup) {
+      ensureNoUnresolvedExpectation();
+      this.expectedArgument = scheduledTaskGroup;
+    }
+
+    void ensureNoUnresolvedExpectation() {
+      assertEquals(null, expectedArgument);
+    }
+
+    /**
+     * @return spied instance for {@link SchedulingPolicy}.
+     */
+    T get() {
+      return spiedInstance;
+    }
+  }
+
+  /**
+   * Wrapper for mock {@link JobStateManager} instance.
+   */
+  private static final class MockJobStateManagerWrapper {
+    private final JobStateManager mockInstance;
+    private final Map<String, TaskGroupState.State> taskGroupStates = new HashMap<>();
+
+    MockJobStateManagerWrapper() {
+      mockInstance = mock(JobStateManager.class);
+      doAnswer(invocationOnMock -> {
+        final String taskGroupId = invocationOnMock.getArgument(0);
+        final TaskGroupState.State newState = invocationOnMock.getArgument(1);
+        taskGroupStates.put(taskGroupId, newState);
+        return null;
+      }).when(mockInstance).onTaskGroupStateChanged(anyString(), any(TaskGroupState.State.class));
+    }
+
+    /**
+     * Ensures the TaskGroup state has been changed as expected.
+     * @param taskGroupId id of the TaskGroup
+     * @param state the expected state
+     */
+    void assertTaskGroupState(final String taskGroupId, final TaskGroupState.State state) {
+      assertEquals(state, taskGroupStates.get(taskGroupId));
+    }
+
+    /**
+     * @return mock instance for {@link JobStateManager}.
+     */
+    JobStateManager get() {
+      return mockInstance;
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
sanha@apache.org.

Mime
View raw message