nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jan...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-53] Make SchedulingPolicy Stackable (#15)
Date Tue, 15 May 2018 08:00:41 GMT
This is an automated email from the ASF dual-hosted git repository.

jangho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new f36d2e2  [NEMO-53] Make SchedulingPolicy Stackable (#15)
f36d2e2 is described below

commit f36d2e263fefcd3002315698d6d17ab3b267f94f
Author: Jae Hyeon Park <usezmap@gmail.com>
AuthorDate: Tue May 15 17:00:39 2018 +0900

    [NEMO-53] Make SchedulingPolicy Stackable (#15)
    
    JIRA: [NEMO-53: Make SchedulingPolicy Stackable](https://issues.apache.org/jira/browse/NEMO-53)
    
    **Major changes:**
    - Changed `SchedulingPolicy` interface to contain only one method: `filterExecutorRepresenters`, which filters `Set` of `ExecutorRepresenter`.
    - Moved scheduling functionality to `ScheduleRunner` from scheduling policy.
    
    **Minor changes to note:**
    - Divide some policies into more specific policies, like `FreeSlotSchedulingPolicy` and `ContainerTypeAwareSchedulingPolicy`.
    
    **Tests for the changes:**
    - Refactored the original tests to passes with new `SchedulingPolicy`.
    - Fixed `FaultTolerenceTest` to not rely on the previous `RoundRobinSchedulingPolicy` that performs actual scheduling.
    
    **Other comments:**
    - At now, `CompositeSchedulingPolicy` is using Tang to get a instance, but maybe there is more good way to resolve this kind of method.
    
    resolves [NEMO-#53](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-53)
---
 .../master/scheduler/BatchSingleJobScheduler.java  |  20 +-
 .../scheduler/CompositeSchedulingPolicy.java       |  56 ++++
 .../ContainerTypeAwareSchedulingPolicy.java        |  58 ++++
 .../master/scheduler/FreeSlotSchedulingPolicy.java |  51 ++++
 .../scheduler/RoundRobinSchedulingPolicy.java      | 175 ++---------
 .../runtime/master/scheduler/SchedulerRunner.java  |  42 ++-
 .../runtime/master/scheduler/SchedulingPolicy.java |  63 +---
 .../SourceLocationAwareSchedulingPolicy.java       | 141 ++-------
 .../snu/nemo/tests/runtime/RuntimeTestUtil.java    |  14 +-
 .../executor/datatransfer/DataTransferTest.java    |  10 +-
 .../scheduler/BatchSingleJobSchedulerTest.java     |   6 +-
 .../ContainerTypeAwareSchedulingPolicyTest.java    |  76 +++++
 .../master/scheduler/FaultToleranceTest.java       |  53 ++--
 .../scheduler/FreeSlotSchedulingPolicyTest.java    |  66 ++++
 .../scheduler/RoundRobinSchedulingPolicyTest.java  | 122 ++------
 .../SourceLocationAwareSchedulingPolicyTest.java   | 332 ++-------------------
 16 files changed, 508 insertions(+), 777 deletions(-)

diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index d579b4d..aa27c0e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -63,6 +63,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
   private final SchedulingPolicy schedulingPolicy;
   private final SchedulerRunner schedulerRunner;
   private final PendingTaskGroupCollection pendingTaskGroupCollection;
+  private final ExecutorRegistry executorRegistry;
 
   /**
    * Other necessary components of this {@link edu.snu.nemo.runtime.master.RuntimeMaster}.
@@ -83,7 +84,8 @@ public final class BatchSingleJobScheduler implements Scheduler {
                                  final PendingTaskGroupCollection pendingTaskGroupCollection,
                                  final BlockManagerMaster blockManagerMaster,
                                  final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
-                                 final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler) {
+                                 final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler,
+                                 final ExecutorRegistry executorRegistry) {
     this.schedulingPolicy = schedulingPolicy;
     this.schedulerRunner = schedulerRunner;
     this.pendingTaskGroupCollection = pendingTaskGroupCollection;
@@ -94,6 +96,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
       pubSubEventHandlerWrapper.getPubSubEventHandler()
           .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
     }
+    this.executorRegistry = executorRegistry;
   }
 
   /**
@@ -166,7 +169,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
-    schedulingPolicy.onExecutorAdded(executorRepresenter);
+    executorRegistry.registerRepresenter(executorRepresenter);
     schedulerRunner.onAnExecutorAvailable();
   }
 
@@ -178,7 +181,10 @@ public final class BatchSingleJobScheduler implements Scheduler {
     taskGroupsToReExecute.addAll(blockManagerMaster.removeWorker(executorId));
 
     // TaskGroups executing on the removed executor
-    taskGroupsToReExecute.addAll(schedulingPolicy.onExecutorRemoved(executorId));
+    executorRegistry.setRepresenterAsFailed(executorId);
+    final ExecutorRepresenter executor = executorRegistry.getFailedExecutorRepresenter(executorId);
+    executor.onExecutorFailed();
+    taskGroupsToReExecute.addAll(executor.getFailedTaskGroups());
 
     taskGroupsToReExecute.forEach(failedTaskGroupId ->
       onTaskGroupStateChanged(executorId, failedTaskGroupId, TaskGroupState.State.FAILED_RECOVERABLE,
@@ -430,7 +436,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
                                             final Boolean isOnHoldToComplete) {
     LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId});
     if (!isOnHoldToComplete) {
-      schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+      executorRegistry.getRunningExecutorRepresenter(executorId).onTaskGroupExecutionComplete(taskGroupId);
     }
 
     final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
@@ -453,7 +459,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
                                           final String taskGroupId,
                                           final String taskPutOnHold) {
     LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId});
-    schedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
+    executorRegistry.getRunningExecutorRepresenter(executorId).onTaskGroupExecutionComplete(taskGroupId);
     final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
 
     final boolean stageComplete =
@@ -493,7 +499,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
                                                      final int attemptIdx, final TaskGroupState.State newState,
                                                      final TaskGroupState.RecoverableFailureCause failureCause) {
     LOG.info("{} failed in {} by {}", new Object[]{taskGroupId, executorId, failureCause});
-    schedulingPolicy.onTaskGroupExecutionFailed(executorId, taskGroupId);
+    executorRegistry.getExecutorRepresenter(executorId).onTaskGroupExecutionFailed(taskGroupId);
 
     final String stageId = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
     final int attemptIndexForStage =
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
new file mode 100644
index 0000000..ac02183
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import javax.inject.Inject;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Temporary class to implement stacked scheduling policy.
+ * At now, policies are injected through Tang, but have to be configurable by users
+ * when Nemo supports job-wide execution property.
+ * TODO #69: Support job-wide execution property.
+ */
+public final class CompositeSchedulingPolicy implements SchedulingPolicy {
+  private final List<SchedulingPolicy> schedulingPolicies;
+
+  @Inject
+  private CompositeSchedulingPolicy(final SourceLocationAwareSchedulingPolicy sourceLocationAwareSchedulingPolicy,
+                                    final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy,
+                                    final FreeSlotSchedulingPolicy freeSlotSchedulingPolicy,
+                                    final ContainerTypeAwareSchedulingPolicy containerTypeAwareSchedulingPolicy) {
+    schedulingPolicies = Arrays.asList(
+        freeSlotSchedulingPolicy,
+        containerTypeAwareSchedulingPolicy,
+        sourceLocationAwareSchedulingPolicy,
+        roundRobinSchedulingPolicy);
+  }
+
+  @Override
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+    Set<ExecutorRepresenter> candidates = executorRepresenterSet;
+    for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
+      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, scheduledTaskGroup);
+    }
+    return candidates;
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
new file mode 100644
index 0000000..b88c0b6
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This policy find executors which has corresponding container type.
+ */
+public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolicy {
+
+  @VisibleForTesting
+  @Inject
+  public ContainerTypeAwareSchedulingPolicy() {
+  }
+
+  /**
+   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
+   *                               If the container type of target TaskGroup is NONE, it will return the original set.
+   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @return filtered Set of {@link ExecutorRepresenter}.
+   */
+  @Override
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+
+    if (scheduledTaskGroup.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+      return executorRepresenterSet;
+    }
+
+    final Set<ExecutorRepresenter> candidateExecutors =
+        executorRepresenterSet.stream()
+            .filter(executor -> executor.getContainerType().equals(scheduledTaskGroup.getContainerType()))
+            .collect(Collectors.toSet());
+
+    return candidateExecutors;
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
new file mode 100644
index 0000000..1eb2958
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+
+import javax.inject.Inject;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This policy finds executor that has free slot for a TaskGroup.
+ */
+public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
+  @VisibleForTesting
+  @Inject
+  public FreeSlotSchedulingPolicy() {
+  }
+
+  /**
+   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
+   *                               Executors that do not have any free slots will be filtered by this policy.
+   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @return filtered Set of {@link ExecutorRepresenter}.
+   */
+  @Override
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+    final Set<ExecutorRepresenter> candidateExecutors =
+        executorRepresenterSet.stream()
+            .filter(executor -> executor.getRunningTaskGroups().size() < executor.getExecutorCapacity())
+            .collect(Collectors.toSet());
+
+    return candidateExecutors;
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index fa5f647..ae19b66 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,10 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
-import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -28,6 +25,7 @@ import javax.inject.Inject;
 import java.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.stream.Collectors;
 
 /**
@@ -42,162 +40,33 @@ import java.util.stream.Collectors;
 public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
   private static final Logger LOG = LoggerFactory.getLogger(RoundRobinSchedulingPolicy.class.getName());
 
-  private final ExecutorRegistry executorRegistry;
-
-  /**
-   * The pool of executors available for each container type.
-   */
-  private final Map<String, List<String>> executorIdByContainerType;
-
-  /**
-   * The index of the next executor to be assigned for each container type.
-   * This map allows the executor index computation of the RR scheduling.
-   */
-  private final Map<String, Integer> nextExecutorIndexByContainerType;
-
-  @Inject
   @VisibleForTesting
-  public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry) {
-    this.executorRegistry = executorRegistry;
-    this.executorIdByContainerType = new HashMap<>();
-    this.nextExecutorIndexByContainerType = new HashMap<>();
-    initializeContainerTypeIfAbsent(ExecutorPlacementProperty.NONE); // Need this to avoid potential null errors
-  }
-
-  @Override
-  public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
-                                   final JobStateManager jobStateManager) {
-    final String containerType = scheduledTaskGroup.getContainerType();
-    initializeContainerTypeIfAbsent(containerType);
-
-    Optional<String> executorId = selectExecutorByRR(containerType);
-    if (!executorId.isPresent()) { // If there is no available executor to schedule this task group now,
-      return false;
-    } else {
-      scheduleTaskGroup(executorId.get(), scheduledTaskGroup, jobStateManager);
-      return true;
-    }
-  }
-
-  @Override
-  public void onExecutorAdded(final ExecutorRepresenter executor) {
-    executorRegistry.registerRepresenter(executor);
-    final String containerType = executor.getContainerType();
-    initializeContainerTypeIfAbsent(containerType);
-
-    executorIdByContainerType.get(containerType)
-        .add(nextExecutorIndexByContainerType.get(containerType), executor.getExecutorId());
-  }
-
-  @Override
-  public Set<String> onExecutorRemoved(final String executorId) {
-    executorRegistry.setRepresenterAsFailed(executorId);
-    final ExecutorRepresenter executor = executorRegistry.getFailedExecutorRepresenter(executorId);
-    executor.onExecutorFailed();
-
-    final String containerType = executor.getContainerType();
-
-    final List<String> executorIdList = executorIdByContainerType.get(containerType);
-    int nextExecutorIndex = nextExecutorIndexByContainerType.get(containerType);
-
-    final int executorAssignmentLocation = executorIdList.indexOf(executorId);
-    if (executorAssignmentLocation < nextExecutorIndex) {
-      nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex - 1);
-    } else if (executorAssignmentLocation == nextExecutorIndex) {
-      nextExecutorIndexByContainerType.put(containerType, 0);
-    }
-    executorIdList.remove(executorId);
-
-    return Collections.unmodifiableSet(executor.getFailedTaskGroups());
-  }
-
-  @Override
-  public void onTaskGroupExecutionComplete(final String executorId, final String taskGroupId) {
-    final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId);
-    executor.onTaskGroupExecutionComplete(taskGroupId);
-    LOG.info("{" + taskGroupId + "} completed in [" + executorId + "]");
-  }
-
-  @Override
-  public void onTaskGroupExecutionFailed(final String executorId, final String taskGroupId) {
-    final ExecutorRepresenter executor = executorRegistry.getExecutorRepresenter(executorId);
-
-    executor.onTaskGroupExecutionFailed(taskGroupId);
-    LOG.info("{" + taskGroupId + "} failed in [" + executorId + "]");
-  }
-
-  @Override
-  public void terminate() {
-    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
-      final ExecutorRepresenter representer = executorRegistry.getRunningExecutorRepresenter(executorId);
-      representer.shutDown();
-      executorRegistry.setRepresenterAsCompleted(executorId);
-    }
+  @Inject
+  public RoundRobinSchedulingPolicy() {
   }
 
   /**
-   * Sticks to the RR policy to select an executor for the next task group.
-   * It checks the task groups running (as compared to each executor's capacity).
-   *
-   * @param containerType to select an executor for.
-   * @return (optionally) the selected executor.
+   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by round robin behaviour.
+   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @return filtered Set of {@link ExecutorRepresenter}.
    */
-  private Optional<String> selectExecutorByRR(final String containerType) {
-    final List<String> candidateExecutorIds = (containerType.equals(ExecutorPlacementProperty.NONE))
-        ? getAllContainers() // all containers
-        : executorIdByContainerType.get(containerType); // containers of a particular type
-
-    if (candidateExecutorIds != null && !candidateExecutorIds.isEmpty()) {
-      final int numExecutors = candidateExecutorIds.size();
-      int nextExecutorIndex = nextExecutorIndexByContainerType.get(containerType);
-      for (int i = 0; i < numExecutors; i++) {
-        final int index = (nextExecutorIndex + i) % numExecutors;
-        final String selectedExecutorId = candidateExecutorIds.get(index);
-
-        final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(selectedExecutorId);
-        if (hasFreeSlot(executor)) {
-          nextExecutorIndex = (index + 1) % numExecutors;
-          nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex);
-          return Optional.of(selectedExecutorId);
-        }
-      }
+  @Override
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+    final OptionalInt minOccupancy =
+        executorRepresenterSet.stream()
+        .map(executor -> executor.getRunningTaskGroups().size())
+        .mapToInt(i -> i).min();
+
+    if (!minOccupancy.isPresent()) {
+      return Collections.emptySet();
     }
 
-    return Optional.empty();
-  }
-
-  /**
-   * Schedules and sends a TaskGroup to the given executor.
-   *
-   * @param executorId         of the executor to execute the TaskGroup.
-   * @param scheduledTaskGroup to assign.
-   * @param jobStateManager    which the TaskGroup belongs to.
-   */
-  private void scheduleTaskGroup(final String executorId,
-                                 final ScheduledTaskGroup scheduledTaskGroup,
-                                 final JobStateManager jobStateManager) {
-    jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), TaskGroupState.State.EXECUTING);
-
-    final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId);
-    LOG.info("Scheduling {} to {}",
-        new Object[]{scheduledTaskGroup.getTaskGroupId(), executorId});
-    executor.onTaskGroupScheduled(scheduledTaskGroup);
-  }
-
-  private List<String> getAllContainers() {
-    return executorIdByContainerType.values().stream()
-        .flatMap(List::stream) // flatten the list of lists to a flat stream
-        .collect(Collectors.toList()); // convert the stream to a list
-  }
-
-  private boolean hasFreeSlot(final ExecutorRepresenter executor) {
-    LOG.debug("Has Free Slot: " + executor.getExecutorId());
-    LOG.debug("Running TaskGroups: " + executor.getRunningTaskGroups());
-    return executor.getRunningTaskGroups().size() < executor.getExecutorCapacity();
-  }
+    final Set<ExecutorRepresenter> candidateExecutors =
+        executorRepresenterSet.stream()
+        .filter(executor -> executor.getRunningTaskGroups().size() == minOccupancy.getAsInt())
+        .collect(Collectors.toSet());
 
-  private void initializeContainerTypeIfAbsent(final String containerType) {
-    executorIdByContainerType.putIfAbsent(containerType, new ArrayList<>());
-    nextExecutorIndexByContainerType.putIfAbsent(containerType, 0);
+    return candidateExecutors;
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 4e08467..49ca72e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,9 +15,12 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.common.state.JobState;
+import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
 import java.util.*;
@@ -27,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,23 +46,27 @@ import javax.inject.Inject;
 public final class SchedulerRunner {
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerRunner.class.getName());
   private final Map<String, JobStateManager> jobStateManagers;
-  private final SchedulingPolicy schedulingPolicy;
   private final PendingTaskGroupCollection pendingTaskGroupCollection;
   private final ExecutorService schedulerThread;
   private boolean initialJobScheduled;
   private boolean isTerminated;
   private final DelayedSignalingCondition mustCheckSchedulingAvailabilityOrSchedulerTerminated
       = new DelayedSignalingCondition();
+  private ExecutorRegistry executorRegistry;
+  private SchedulingPolicy schedulingPolicy;
 
+  @VisibleForTesting
   @Inject
   public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
-                         final PendingTaskGroupCollection pendingTaskGroupCollection) {
+                         final PendingTaskGroupCollection pendingTaskGroupCollection,
+                         final ExecutorRegistry executorRegistry) {
     this.jobStateManagers = new HashMap<>();
     this.pendingTaskGroupCollection = pendingTaskGroupCollection;
-    this.schedulingPolicy = schedulingPolicy;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "SchedulerRunner"));
     this.initialJobScheduled = false;
     this.isTerminated = false;
+    this.executorRegistry = executorRegistry;
+    this.schedulingPolicy = schedulingPolicy;
   }
 
   /**
@@ -92,7 +100,11 @@ public final class SchedulerRunner {
   }
 
   void terminate() {
-    schedulingPolicy.terminate();
+    for (final String executorId : executorRegistry.getRunningExecutorIds()) {
+      final ExecutorRepresenter representer = executorRegistry.getRunningExecutorRepresenter(executorId);
+      representer.shutDown();
+      executorRegistry.setRepresenterAsCompleted(executorId);
+    }
     isTerminated = true;
     mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
   }
@@ -122,12 +134,24 @@ public final class SchedulerRunner {
         for (final ScheduledTaskGroup schedulableTaskGroup : schedulableTaskGroups) {
           final JobStateManager jobStateManager = jobStateManagers.get(schedulableTaskGroup.getJobId());
           LOG.debug("Trying to schedule {}...", schedulableTaskGroup.getTaskGroupId());
-          final boolean isScheduled =
-              schedulingPolicy.scheduleTaskGroup(schedulableTaskGroup, jobStateManager);
-          if (isScheduled) {
-            LOG.debug("Successfully scheduled {}", schedulableTaskGroup.getTaskGroupId());
+
+          final Set<ExecutorRepresenter> runningExecutorRepresenter =
+              executorRegistry.getRunningExecutorIds().stream()
+              .map(executorId -> executorRegistry.getExecutorRepresenter(executorId))
+              .collect(Collectors.toSet());
+
+          final Set<ExecutorRepresenter> candidateExecutors =
+              schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, schedulableTaskGroup);
+
+          if (candidateExecutors.size() != 0) {
+            jobStateManager.onTaskGroupStateChanged(schedulableTaskGroup.getTaskGroupId(),
+                TaskGroupState.State.EXECUTING);
+            final ExecutorRepresenter executor = candidateExecutors.stream().findFirst().get();
+            executor.onTaskGroupScheduled(schedulableTaskGroup);
+
             pendingTaskGroupCollection.remove(schedulableTaskGroup.getTaskGroupId());
             numScheduledTaskGroups++;
+            LOG.debug("Successfully scheduled {}", schedulableTaskGroup.getTaskGroupId());
           } else {
             LOG.debug("Failed to schedule {}", schedulableTaskGroup.getTaskGroupId());
           }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 061adae..b0e39ac 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -29,61 +28,9 @@ import java.util.Set;
  */
 @DriverSide
 @ThreadSafe
-@DefaultImplementation(SourceLocationAwareSchedulingPolicy.class)
+@FunctionalInterface
+@DefaultImplementation(CompositeSchedulingPolicy.class)
 public interface SchedulingPolicy {
-
-  /**
-   * Attempts to schedule the given taskGroup to an executor according to this policy.
-   * If there is no executor available for the taskGroup, it waits for an executor to be assigned before it times out.
-   * (Depending on the executor's resource type)
-   *
-   * @param scheduledTaskGroup to schedule.
-   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
-   * @return true if the task group is successfully scheduled, false otherwise.
-   */
-  boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup, final JobStateManager jobStateManager);
-
-  /**
-   * Adds the executorId to the pool of available executors.
-   * Unlocks this policy to schedule a next taskGroup if locked.
-   * (Depending on the executor's resource type)
-   *
-   * @param executorRepresenter for the executor that has been added.
-   */
-  void onExecutorAdded(ExecutorRepresenter executorRepresenter);
-
-  /**
-   * Deletes the executorId from the pool of available executors.
-   * Locks this policy from scheduling if there is no more executor currently available for the next taskGroup.
-   * (Depending on the executor's resource type)
-   *
-   * @param executorId for the executor that has been deleted.
-   * @return the ids of the set of task groups that were running on the executor.
-   */
-  Set<String> onExecutorRemoved(String executorId);
-
-  /**
-   * Marks the taskGroup's completion in the executor.
-   * Unlocks this policy to schedule a next taskGroup if locked.
-   * (Depending on the executor's resource type)
-   *
-   * @param executorId of the executor where the taskGroup's execution has completed.
-   * @param taskGroupId whose execution has completed.
-   */
-  void onTaskGroupExecutionComplete(String executorId, String taskGroupId);
-
-  /**
-   * Marks the taskGroup's failure in the executor.
-   * Unlocks this policy to reschedule this taskGroup if locked.
-   * (Depending on the executor's resource type)
-   *
-   * @param executorId of the executor where the taskGroup's execution has failed.
-   * @param taskGroupId whose execution has completed.
-   */
-  void onTaskGroupExecutionFailed(String executorId, String taskGroupId);
-
-  /**
-   * End of scheduling.
-   */
-  void terminate();
+  Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
+                                                      final ScheduledTaskGroup scheduledTaskGroup);
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index 9c69253..c5eee7f 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -15,11 +15,9 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
-import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
@@ -28,9 +26,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * This policy is same as {@link RoundRobinSchedulingPolicy}, however for TaskGroups
@@ -42,130 +38,51 @@ import java.util.stream.Stream;
 public final class SourceLocationAwareSchedulingPolicy implements SchedulingPolicy {
   private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class);
 
-  private final ExecutorRegistry executorRegistry;
-  private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
+  @VisibleForTesting
+  @Inject
+  public SourceLocationAwareSchedulingPolicy() {
+  }
 
   /**
-   * Injectable constructor for {@link SourceLocationAwareSchedulingPolicy}.
-   * @param executorRegistry provides catalog of available executors
-   * @param roundRobinSchedulingPolicy provides fallback for TaskGroups with no input source information
+   * @param readables collection of readables
+   * @return Set of source locations from source tasks in {@code taskGroupDAG}
+   * @throws Exception for any exception raised during querying source locations for a readable
    */
-  @Inject
-  private SourceLocationAwareSchedulingPolicy(final ExecutorRegistry executorRegistry,
-                                              final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy) {
-    this.executorRegistry = executorRegistry;
-    this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
+  private static Set<String> getSourceLocations(final Collection<Readable> readables) throws Exception {
+    final List<String> sourceLocations = new ArrayList<>();
+    for (final Readable readable : readables) {
+      sourceLocations.addAll(readable.getLocations());
+    }
+    return new HashSet<>(sourceLocations);
   }
 
   /**
-   * Try to schedule a TaskGroup.
-   * If the TaskGroup has one or more source tasks, this method schedules the task group to one of the physical nodes,
-   * chosen from union of set of locations where splits of each source task resides.
-   * If the TaskGroup has no source tasks, falls back to {@link RoundRobinSchedulingPolicy}.
-   * @param scheduledTaskGroup to schedule.
-   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
-   * @return true if the task group is successfully scheduled, false otherwise.
+   * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
+   *                               If there is no source locations, will return original set.
+   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
-  public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
-                                   final JobStateManager jobStateManager) {
-    Set<String> sourceLocations = Collections.emptySet();
+  public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
+                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+    final Set<String> sourceLocations;
     try {
       sourceLocations = getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
     } catch (final UnsupportedOperationException e) {
-      // do nothing
+      return executorRepresenterSet;
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
-    if (sourceLocations.size() == 0) {
-      // No source location information found, fall back to the RoundRobinSchedulingPolicy
-      return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, jobStateManager);
-    }
-
-    return scheduleToLocalNode(scheduledTaskGroup, jobStateManager, sourceLocations);
-  }
 
-  /**
-   * Try to schedule a TaskGroup with source task.
-   * @param scheduledTaskGroup TaskGroup to schedule
-   * @param jobStateManager {@link JobStateManager}
-   * @param jobStateManager jobStateManager which the TaskGroup belongs to.
-   * @return true if the task group is successfully scheduled, false otherwise.
-   */
-  private boolean scheduleToLocalNode(final ScheduledTaskGroup scheduledTaskGroup,
-                                      final JobStateManager jobStateManager,
-                                      final Set<String> sourceLocations) {
-    final List<ExecutorRepresenter> candidateExecutors =
-        selectExecutorByContainerTypeAndNodeNames(scheduledTaskGroup.getContainerType(), sourceLocations);
-    if (candidateExecutors.size() == 0) {
-      return false;
+    if (sourceLocations.size() == 0) {
+      return executorRepresenterSet;
     }
-    final int randomIndex = ThreadLocalRandom.current().nextInt(0, candidateExecutors.size());
-    final ExecutorRepresenter selectedExecutor = candidateExecutors.get(randomIndex);
 
-    jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), TaskGroupState.State.EXECUTING);
-    selectedExecutor.onTaskGroupScheduled(scheduledTaskGroup);
-    LOG.info("Scheduling {} (source location: {}) to {} (node name: {})", scheduledTaskGroup.getTaskGroupId(),
-        String.join(", ", sourceLocations), selectedExecutor.getExecutorId(),
-        selectedExecutor.getNodeName());
-    return true;
-  }
-
-  @Override
-  public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
-    roundRobinSchedulingPolicy.onExecutorAdded(executorRepresenter);
-  }
-
-  @Override
-  public Set<String> onExecutorRemoved(final String executorId) {
-    return roundRobinSchedulingPolicy.onExecutorRemoved(executorId);
-  }
+    final Set<ExecutorRepresenter> candidateExecutors =
+            executorRepresenterSet.stream()
+            .filter(executor -> sourceLocations.contains(executor.getNodeName()))
+            .collect(Collectors.toSet());
 
-  @Override
-  public void onTaskGroupExecutionComplete(final String executorId, final String taskGroupId) {
-    roundRobinSchedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
-  }
-
-  @Override
-  public void onTaskGroupExecutionFailed(final String executorId, final String taskGroupId) {
-    roundRobinSchedulingPolicy.onTaskGroupExecutionFailed(executorId, taskGroupId);
-  }
-
-  @Override
-  public void terminate() {
-    roundRobinSchedulingPolicy.terminate();
-  }
-
-  /**
-   * @param containerType type of the desired container type
-   * @param nodeNames set of node names
-   * @return list of executors, which resides in one of {@code nodeNames}, has container type of {@code containerType},
-   *         and has an empty slot for execution
-   */
-  private List<ExecutorRepresenter> selectExecutorByContainerTypeAndNodeNames(
-    final String containerType, final Set<String> nodeNames) {
-    final Stream<ExecutorRepresenter> localNodesWithSpareCapacity = executorRegistry.getRunningExecutorIds().stream()
-        .map(executorId -> executorRegistry.getRunningExecutorRepresenter(executorId))
-        .filter(executor -> executor.getRunningTaskGroups().size() < executor.getExecutorCapacity())
-        .filter(executor -> nodeNames.contains(executor.getNodeName()));
-    if (containerType.equals(ExecutorPlacementProperty.NONE)) {
-      return localNodesWithSpareCapacity.collect(Collectors.toList());
-    } else {
-      return localNodesWithSpareCapacity.filter(executor -> executor.getContainerType().equals(containerType))
-          .collect(Collectors.toList());
-    }
-  }
-
-  /**
-   * @param readables collection of readables
-   * @return Set of source locations from source tasks in {@code taskGroupDAG}
-   * @throws Exception for any exception raised during querying source locations for a readable
-   */
-  private static Set<String> getSourceLocations(final Collection<Readable> readables) throws Exception {
-    final List<String> sourceLocations = new ArrayList<>();
-    for (final Readable readable : readables) {
-      sourceLocations.addAll(readable.getLocations());
-    }
-    return new HashSet<>(sourceLocations);
+    return candidateExecutors;
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
index f95c452..49d5d4d 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
@@ -107,12 +107,24 @@ public final class RuntimeTestUtil {
   public static void mockSchedulerRunner(final PendingTaskGroupCollection pendingTaskGroupCollection,
                                          final SchedulingPolicy schedulingPolicy,
                                          final JobStateManager jobStateManager,
+                                         final ExecutorRegistry executorRegistry,
                                          final boolean isPartialSchedule) {
     while (!pendingTaskGroupCollection.isEmpty()) {
       final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupCollection.remove(
           pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId());
 
-      schedulingPolicy.scheduleTaskGroup(taskGroupToSchedule, jobStateManager);
+      final Set<ExecutorRepresenter> runningExecutorRepresenter =
+          executorRegistry.getRunningExecutorIds().stream()
+              .map(executorId -> executorRegistry.getExecutorRepresenter(executorId))
+              .collect(Collectors.toSet());
+      final Set<ExecutorRepresenter> candidateExecutors =
+          schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, taskGroupToSchedule);
+      if (candidateExecutors.size() > 0) {
+        jobStateManager.onTaskGroupStateChanged(taskGroupToSchedule.getTaskGroupId(),
+            TaskGroupState.State.EXECUTING);
+        final ExecutorRepresenter executor = candidateExecutors.stream().findFirst().get();
+        executor.onTaskGroupScheduled(taskGroupToSchedule);
+      }
 
       // Schedule only the first task group.
       if (isPartialSchedule) {
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index ff16bf4..7131b86 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -74,9 +74,11 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.awt.*;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 
@@ -128,17 +130,17 @@ public final class DataTransferTest {
     injector.bindVolatileInstance(EvaluatorRequestor.class, mock(EvaluatorRequestor.class));
     injector.bindVolatileInstance(MessageEnvironment.class, messageEnvironment);
     final ContainerManager containerManager = injector.getInstance(ContainerManager.class);
+    final ExecutorRegistry executorRegistry = injector.getInstance(ExecutorRegistry.class);
 
     final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class);
     final PubSubEventHandlerWrapper pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
-    final SchedulingPolicy schedulingPolicy = new RoundRobinSchedulingPolicy(
-        injector.getInstance(ExecutorRegistry.class));
+    final SchedulingPolicy schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
     final PendingTaskGroupCollection taskGroupQueue = new SingleJobTaskGroupCollection();
-    final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskGroupQueue);
+    final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskGroupQueue, executorRegistry);
     final Scheduler scheduler =
         new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, taskGroupQueue, master,
-            pubSubEventHandler, updatePhysicalPlanEventHandler);
+            pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
 
     // Necessary for wiring up the message environments
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index c87bc81..ab39a96 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -102,13 +102,13 @@ public final class BatchSingleJobSchedulerTest {
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
     pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
-    schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupCollection);
+    schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
+    schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupCollection, executorRegistry);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
         new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskGroupCollection,
-            blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler);
+            blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
new file mode 100644
index 0000000..1f9462a
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.runtime.master.scheduler;
+
+import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import edu.snu.nemo.runtime.master.scheduler.ContainerTypeAwareSchedulingPolicy;
+import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests {@link ContainerTypeAwareSchedulingPolicy}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class})
+public final class ContainerTypeAwareSchedulingPolicyTest {
+
+  private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
+    final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
+    when(executorRepresenter.getContainerType()).thenReturn(containerType);
+    return executorRepresenter;
+  }
+
+  @Test
+  public void testContainerTypeAware() {
+    final SchedulingPolicy schedulingPolicy = new ContainerTypeAwareSchedulingPolicy();
+    final ExecutorRepresenter a0 = mockExecutorRepresenter(ExecutorPlacementProperty.TRANSIENT);
+    final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
+    final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
+
+    final ScheduledTaskGroup scheduledTaskGroup1 = mock(ScheduledTaskGroup.class);
+    when(scheduledTaskGroup1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+
+    final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
+
+    final Set<ExecutorRepresenter> candidateExecutors1 =
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, scheduledTaskGroup1);
+
+    final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
+    assertEquals(expectedExecutors1, candidateExecutors1);
+
+    final ScheduledTaskGroup scheduledTaskGroup2 = mock(ScheduledTaskGroup.class);
+    when(scheduledTaskGroup2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+
+    final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
+
+    final Set<ExecutorRepresenter> candidateExecutors2 =
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, scheduledTaskGroup2);
+
+    final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
+    assertEquals(expectedExecutors2, candidateExecutors2);
+  }
+}
+
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
index 8bfd993..f641924 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
@@ -108,19 +108,20 @@ public final class FaultToleranceTest {
 
   private void setUpExecutors(final Collection<ExecutorRepresenter> executors,
                               final boolean useMockSchedulerRunner) throws InjectionException {
-    executorRegistry = Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
+    final Injector injector = Tang.Factory.getTang().newInjector();
+    executorRegistry = injector.getInstance(ExecutorRegistry.class);
 
     pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
+    schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
 
     if (useMockSchedulerRunner) {
       schedulerRunner = mock(SchedulerRunner.class);
     } else {
-      schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupCollection);
+      schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupCollection, executorRegistry);
     }
     scheduler =
         new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskGroupCollection,
-            blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler);
+            blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
 
     // Add nodes
     for (final ExecutorRepresenter executor : executors) {
@@ -207,14 +208,17 @@ public final class FaultToleranceTest {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
-        // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        scheduler.onExecutorRemoved("a3");
+        // There are 2 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+            executorRegistry, false);
 
         // Due to round robin scheduling, "a2" is assured to have a running TaskGroup.
         scheduler.onExecutorRemoved("a2");
@@ -224,30 +228,17 @@ public final class FaultToleranceTest {
         }
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 2);
 
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else {
-        // There are 2 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 2.
+        // There are 1 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 2.
         // Schedule only the first TaskGroup
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, true);
-
-        boolean first = true;
-        for (final String taskGroupId : stage.getTaskGroupIds()) {
-          // When a TaskGroup fails while the siblings are still in the queue,
-          if (first) {
-            // Due to round robin scheduling, "a3" is assured to have a running TaskGroup.
-            scheduler.onExecutorRemoved("a3");
-            first = false;
-          } else {
-            // Test that the sibling TaskGroup state remains unchanged.
-            assertEquals(
-                jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(),
-                TaskGroupState.State.READY);
-          }
-        }
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+            executorRegistry, true);
       }
     }
   }
@@ -284,14 +275,16 @@ public final class FaultToleranceTest {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
@@ -344,14 +337,16 @@ public final class FaultToleranceTest {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+            executorRegistry, false);
         assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+            executorRegistry, false);
 
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
new file mode 100644
index 0000000..5308ab6
--- /dev/null
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.tests.runtime.master.scheduler;
+
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import edu.snu.nemo.runtime.master.scheduler.FreeSlotSchedulingPolicy;
+import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests {@link FreeSlotSchedulingPolicy}.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class})
+public final class FreeSlotSchedulingPolicyTest {
+
+  private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTaskGroups,
+                                                             final int capacity) {
+    final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
+    final Set<String> runningTaskGroups = new HashSet<>();
+    IntStream.range(0, numRunningTaskGroups).forEach(i -> runningTaskGroups.add(String.valueOf(i)));
+    when(executorRepresenter.getRunningTaskGroups()).thenReturn(runningTaskGroups);
+    when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
+    return executorRepresenter;
+  }
+
+  @Test
+  public void testFreeSlot() {
+    final SchedulingPolicy schedulingPolicy = new FreeSlotSchedulingPolicy();
+    final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
+    final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
+
+    final ScheduledTaskGroup scheduledTaskGroup = mock(ScheduledTaskGroup.class);
+
+    final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
+
+    final Set<ExecutorRepresenter> candidateExecutors =
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTaskGroup);
+
+    final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
+    assertEquals(expectedExecutors, candidateExecutors);
+  }
+}
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index 22068a5..6061fc0 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2017 Seoul National University
+ * Copyright (C) 2018 Seoul National University
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -15,130 +15,52 @@
  */
 package edu.snu.nemo.tests.runtime.master.scheduler;
 
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.comm.ControlMessage;
-import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
 import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
-import org.apache.reef.driver.context.ActiveContext;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.Function;
+import java.util.stream.IntStream;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
 
 /**
  * Tests {@link RoundRobinSchedulingPolicy}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(JobStateManager.class)
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class})
 public final class RoundRobinSchedulingPolicyTest {
-  private SchedulingPolicy schedulingPolicy;
-  private ExecutorRegistry executorRegistry;
-  private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
-  private JobStateManager jobStateManager = mock(JobStateManager.class);
 
-  // This schedule index will make sure that task group events are not ignored
-  private static final int MAGIC_SCHEDULE_ATTEMPT_INDEX = Integer.MAX_VALUE;
-  private static final String RESERVED_EXECUTOR_ID = "RESERVED";
-
-  @Before
-  public void setUp() throws InjectionException {
-    executorRegistry = Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
-
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
-
-    final ActiveContext activeContext = mock(ActiveContext.class);
-    Mockito.doThrow(new RuntimeException()).when(activeContext).close();
-
-    final ExecutorService serExecutorService = Executors.newSingleThreadExecutor();
-    final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 1, 0);
-    final Function<String, ExecutorRepresenter> computeSpecExecutorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter a3 = computeSpecExecutorRepresenterGenerator.apply("a3");
-    final ExecutorRepresenter a2 = computeSpecExecutorRepresenterGenerator.apply("a2");
-    final ExecutorRepresenter a1 = computeSpecExecutorRepresenterGenerator.apply("a1");
-
-    final ResourceSpecification storageSpec = new ResourceSpecification(ExecutorPlacementProperty.TRANSIENT, 1, 0);
-    final Function<String, ExecutorRepresenter> storageSpecExecutorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, storageSpec, mockMsgSender, activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter b2 = storageSpecExecutorRepresenterGenerator.apply("b2");
-    final ExecutorRepresenter b1 = storageSpecExecutorRepresenterGenerator.apply("b1");
-
-    final ResourceSpecification reservedSpec = new ResourceSpecification(ExecutorPlacementProperty.RESERVED, 1, 0);
-    final Function<String, ExecutorRepresenter> reservedSpecExecutorRepresenterGenerator = executorId ->
-        new ExecutorRepresenter(executorId, reservedSpec, mockMsgSender, activeContext, serExecutorService, executorId);
-    final ExecutorRepresenter r = reservedSpecExecutorRepresenterGenerator.apply(RESERVED_EXECUTOR_ID);
-
-    // Add compute nodes
-    schedulingPolicy.onExecutorAdded(a3);
-    schedulingPolicy.onExecutorAdded(a2);
-    schedulingPolicy.onExecutorAdded(a1);
-
-    // Add storage nodes
-    schedulingPolicy.onExecutorAdded(b2);
-    schedulingPolicy.onExecutorAdded(b1);
-
-    // Add reserved node
-    schedulingPolicy.onExecutorAdded(r);
+  private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTaskGroups) {
+    final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
+    final Set<String> runningTaskGroups = new HashSet<>();
+    IntStream.range(0, numRunningTaskGroups).forEach(i -> runningTaskGroups.add(String.valueOf(i)));
+    when(executorRepresenter.getRunningTaskGroups()).thenReturn(runningTaskGroups);
+    return executorRepresenter;
   }
 
   @Test
-  public void testNoneContainerType() {
-    final int slots = 6;
-    final List<ScheduledTaskGroup> scheduledTaskGroups =
-        convertToScheduledTaskGroups(slots + 1, new byte[0], "Stage A", ExecutorPlacementProperty.NONE);
+  public void testRoundRobin() {
+    final SchedulingPolicy schedulingPolicy = new RoundRobinSchedulingPolicy();
+    final ExecutorRepresenter a0 = mockExecutorRepresenter(1);
+    final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
+    final ExecutorRepresenter a2 = mockExecutorRepresenter(2);
 
-    boolean isScheduled;
-    for (int i = 0; i < slots; i++) {
-      isScheduled = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(i), jobStateManager);
-      assertTrue(isScheduled);
-    }
+    final ScheduledTaskGroup scheduledTaskGroup = mock(ScheduledTaskGroup.class);
 
-    // No more slot
-    isScheduled = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(slots), jobStateManager);
-    assertFalse(isScheduled);
-  }
+    final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
+
+    final Set<ExecutorRepresenter> candidateExecutors =
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTaskGroup);
 
-  /**
-   * Wrap a DAG of a task group into {@link ScheduledTaskGroup}s.
-   *
-   * @param parallelism            how many scheduled task group will be generated.
-   * @param serializedTaskGroupDag the serialized DAG of the task group.
-   * @param stageId                the ID of the stage.
-   * @param containerType          the type of container to execute the task group on.
-   * @return the wrapped scheduled task groups.
-   */
-  private List<ScheduledTaskGroup> convertToScheduledTaskGroups(final int parallelism,
-                                                                final byte[] serializedTaskGroupDag,
-                                                                final String stageId,
-                                                                final String containerType) {
-    final List<ScheduledTaskGroup> scheduledTaskGroups = new ArrayList<>(parallelism);
-    for (int taskGroupIdx = 0; taskGroupIdx < parallelism; taskGroupIdx++) {
-      final String taskGroupId = RuntimeIdGenerator.generateTaskGroupId(taskGroupIdx, stageId);
-      scheduledTaskGroups.add(
-          new ScheduledTaskGroup("TestPlan", serializedTaskGroupDag, taskGroupId, Collections.emptyList(),
-              Collections.emptyList(), MAGIC_SCHEDULE_ATTEMPT_INDEX, containerType, Collections.emptyMap()));
-    }
-    return scheduledTaskGroups;
+    final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
+    assertEquals(expectedExecutors, candidateExecutors);
   }
 }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 143ecb4..810bb31 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,21 +15,11 @@
  */
 package edu.snu.nemo.tests.runtime.master.scheduler;
 
-import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy;
-import org.apache.reef.tang.Injector;
-import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -37,91 +27,24 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.*;
 
 /**
- * Test cases for
+ * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({JobStateManager.class, ExecutorRepresenter.class, RoundRobinSchedulingPolicy.class,
-    ScheduledTaskGroup.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class, Readable.class})
 public final class SourceLocationAwareSchedulingPolicyTest {
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
   private static final String SITE_2 = "BUSAN";
 
-  private SourceLocationAwareSchedulingPolicy sourceLocationAware;
-  private SpiedSchedulingPolicyWrapper<RoundRobinSchedulingPolicy> roundRobin;
-  private MockJobStateManagerWrapper jobStateManager;
-
-  @Before
-  public void setup() {
-    final Injector injector = Tang.Factory.getTang().newInjector();
-    jobStateManager = new MockJobStateManagerWrapper();
-
-    final ExecutorRegistry executorRegistry = new ExecutorRegistry();
-    final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy =
-        new RoundRobinSchedulingPolicy(executorRegistry);
-    roundRobin = new SpiedSchedulingPolicyWrapper(roundRobinSchedulingPolicy, jobStateManager.get());
-
-    injector.bindVolatileInstance(RoundRobinSchedulingPolicy.class, roundRobin.get());
-    injector.bindVolatileInstance(JobStateManager.class, jobStateManager.get());
-    injector.bindVolatileInstance(ExecutorRegistry.class, executorRegistry);
-    try {
-      sourceLocationAware = injector.getInstance(SourceLocationAwareSchedulingPolicy.class);
-    } catch (final InjectionException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @After
-  public void teardown() {
-    // All expectations should be resolved at this time.
-    roundRobin.ensureNoUnresolvedExpectation();
-  }
-
-  /**
-   * {@link SourceLocationAwareSchedulingPolicy} should delegate scheduling decision when the
-   * {@link ScheduledTaskGroup} does not have any source tasks.
-   */
-  @Test
-  public void testRoundRobinSchedulerFallback() {
-    // Prepare test scenario
-    final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withoutReadables(ExecutorPlacementProperty.NONE);
-    final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithoutSourceLocations(2,
-        ExecutorPlacementProperty.NONE);
-    final ScheduledTaskGroup tg2 = CreateScheduledTaskGroup.withReadablesWhichThrowException(5,
-        ExecutorPlacementProperty.NONE);
-    addExecutor(new MockExecutorRepresenterWrapper(SITE_0, ExecutorPlacementProperty.NONE, 1));
-    addExecutor(new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1));
-
-    // Trying to schedule tg0: expected to fall back to RoundRobinSchedulingPolicy
-    roundRobin.expectSchedulingRequest(tg0);
-    // ...and scheduling attempt must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, jobStateManager.get()));
-    // ...thus the TaskGroup should be running
-    jobStateManager.assertTaskGroupState(tg0.getTaskGroupId(), TaskGroupState.State.EXECUTING);
-
-    // Trying to schedule tg1: expected to fall back to RoundRobinSchedulingPolicy
-    roundRobin.expectSchedulingRequest(tg1);
-    // ...and scheduling attempt must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, jobStateManager.get()));
-    // ...thus the TaskGroup should be running
-    jobStateManager.assertTaskGroupState(tg1.getTaskGroupId(), TaskGroupState.State.EXECUTING);
-
-    // Trying to schedule tg2: expected to fall back to RoundRobinSchedulingPolicy
-    roundRobin.expectSchedulingRequest(tg2);
-    // ...and scheduling attempt must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg2, jobStateManager.get()));
-    // ...thus the TaskGroup should be running
-    jobStateManager.assertTaskGroupState(tg2.getTaskGroupId(), TaskGroupState.State.EXECUTING);
+  private static ExecutorRepresenter mockExecutorRepresenter(final String executorId) {
+    final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
+    when(executorRepresenter.getNodeName()).thenReturn(executorId);
+    return executorRepresenter;
   }
 
   /**
@@ -130,83 +53,16 @@ public final class SourceLocationAwareSchedulingPolicyTest {
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() {
-    // Prepare test scenario
-    final ScheduledTaskGroup tg = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), ExecutorPlacementProperty.NONE);
-    final MockExecutorRepresenterWrapper e0 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1));
-    final MockExecutorRepresenterWrapper e1 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1));
-
-    // Attempt to schedule tg must fail (fallback to round robin policy is not expected)
-    assertFalse(sourceLocationAware.scheduleTaskGroup(tg, jobStateManager.get()));
-    // Thus executors should have no running TaskGroups at all
-    e0.assertScheduledTaskGroups(Collections.emptyList());
-    e1.assertScheduledTaskGroups(Collections.emptyList());
-  }
+    final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
 
-  private static final String CONTAINER_TYPE_A = "A";
-
-  /**
-   * {@link SourceLocationAwareSchedulingPolicy} should schedule TG to one of the executors with appropriate
-   * location and container type.
-   */
-  @Test
-  public void testSourceLocationAwareSchedulingWithContainerType() {
     // Prepare test scenario
     final ScheduledTaskGroup tg = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e0 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_0, ExecutorPlacementProperty.NONE, 1));
-    final MockExecutorRepresenterWrapper e1 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, ExecutorPlacementProperty.NONE, 1));
-    final MockExecutorRepresenterWrapper e2 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_2, ExecutorPlacementProperty.NONE, 1));
-    final MockExecutorRepresenterWrapper e3 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_0, CONTAINER_TYPE_A, 1));
-    final MockExecutorRepresenterWrapper e4 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 1));
-    final MockExecutorRepresenterWrapper e5 = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_2, CONTAINER_TYPE_A, 1));
-
-    // Attempt to schedule tg must success (fallback to round robin is not expected)
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg, jobStateManager.get()));
-    // tg must run on e3
-    e0.assertScheduledTaskGroups(Collections.emptyList());
-    e1.assertScheduledTaskGroups(Collections.emptyList());
-    e2.assertScheduledTaskGroups(Collections.emptyList());
-    e3.assertScheduledTaskGroups(Collections.singletonList(tg));
-    e4.assertScheduledTaskGroups(Collections.emptyList());
-    e5.assertScheduledTaskGroups(Collections.emptyList());
-  }
-
-  /**
-   * {@link SourceLocationAwareSchedulingPolicy} should not schedule more TGs than executor capacity allows.
-   */
-  @Test
-  public void testSourceLocationAwareSchedulingDoesNotOverSchedule() {
-    // Prepare test scenario
-    final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
-    final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
-    final ScheduledTaskGroup tg2 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
-    final ScheduledTaskGroup tg3 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_0, CONTAINER_TYPE_A, 3));
-
-    // Attempt to schedule TG must success (fallback to round robin is not expected)
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg2, jobStateManager.get()));
+        Collections.singletonList(Collections.singletonList(SITE_0)));
+    final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
+    final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
 
-    // This must fail
-    assertFalse(sourceLocationAware.scheduleTaskGroup(tg3, jobStateManager.get()));
-
-    // Expected executor status
-    e.assertScheduledTaskGroups(Arrays.asList(tg0, tg1, tg2));
+    assertEquals(Collections.emptySet(),
+        schedulingPolicy.filterExecutorRepresenters(new HashSet<>(Arrays.asList(e0, e1)), tg));
   }
 
   /**
@@ -214,34 +70,26 @@ public final class SourceLocationAwareSchedulingPolicyTest {
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() {
+    final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
     // Prepare test scenario
     final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_1)), CONTAINER_TYPE_A);
+        Collections.singletonList(Collections.singletonList(SITE_1)));
     final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)), CONTAINER_TYPE_A);
+        Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)));
     final ScheduledTaskGroup tg2 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_0), Collections.singletonList(SITE_1),
-            Arrays.asList(SITE_1, SITE_2)), CONTAINER_TYPE_A);
+            Arrays.asList(SITE_1, SITE_2)));
     final ScheduledTaskGroup tg3 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_1), Collections.singletonList(SITE_0),
-            Arrays.asList(SITE_0, SITE_2)), CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e = addExecutor(
-        new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 4));
-
-    // Attempt to schedule TG must success (fallback to round robin is not expected)
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg2, jobStateManager.get()));
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg3, jobStateManager.get()));
+            Arrays.asList(SITE_0, SITE_2)));
 
-    // Expected executor status
-    e.assertScheduledTaskGroups(Arrays.asList(tg0, tg1, tg2, tg3));
+    final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
+    for (final ScheduledTaskGroup tg : new HashSet<>(Arrays.asList(tg0, tg1, tg2, tg3))) {
+      assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
+          new HashSet<>(Collections.singletonList(e)), tg));
+    }
   }
 
-  private MockExecutorRepresenterWrapper addExecutor(final MockExecutorRepresenterWrapper executor) {
-    sourceLocationAware.onExecutorAdded(executor.get());
-    return executor;
-  }
 
   /**
    * Utility for creating {@link ScheduledTaskGroup}.
@@ -250,19 +98,17 @@ public final class SourceLocationAwareSchedulingPolicyTest {
     private static final AtomicInteger taskGroupIndex = new AtomicInteger(0);
     private static final AtomicInteger taskIndex = new AtomicInteger(0);
 
-    private static ScheduledTaskGroup doCreate(final Collection<Readable> readables, final String containerType) {
+    private static ScheduledTaskGroup doCreate(final Collection<Readable> readables) {
       final ScheduledTaskGroup mockInstance = mock(ScheduledTaskGroup.class);
       final Map<String, Readable> readableMap = new HashMap<>();
       readables.forEach(readable -> readableMap.put(String.format("TASK-%d", taskIndex.getAndIncrement()),
           readable));
       when(mockInstance.getTaskGroupId()).thenReturn(String.format("TG-%d", taskGroupIndex.getAndIncrement()));
       when(mockInstance.getLogicalTaskIdToReadable()).thenReturn(readableMap);
-      when(mockInstance.getContainerType()).thenReturn(containerType);
       return mockInstance;
     }
 
-    static ScheduledTaskGroup withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation,
-                                                               final String containerType) {
+    static ScheduledTaskGroup withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (final List<String> locations : sourceLocation) {
@@ -270,14 +116,13 @@ public final class SourceLocationAwareSchedulingPolicyTest {
           when(readable.getLocations()).thenReturn(locations);
           readables.add(readable);
         }
-        return doCreate(readables, containerType);
+        return doCreate(readables);
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
     }
 
-    static ScheduledTaskGroup withReadablesWithoutSourceLocations(final int numReadables,
-                                                                  final String containerType) {
+    static ScheduledTaskGroup withReadablesWithoutSourceLocations(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -285,14 +130,13 @@ public final class SourceLocationAwareSchedulingPolicyTest {
           when(readable.getLocations()).thenReturn(Collections.emptyList());
           readables.add(readable);
         }
-        return doCreate(readables, containerType);
+        return doCreate(readables);
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
     }
 
-    static ScheduledTaskGroup withReadablesWhichThrowException(final int numReadables,
-                                                               final String containerType) {
+    static ScheduledTaskGroup withReadablesWhichThrowException(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -300,128 +144,14 @@ public final class SourceLocationAwareSchedulingPolicyTest {
           when(readable.getLocations()).thenThrow(new UnsupportedOperationException());
           readables.add(readable);
         }
-        return doCreate(readables, containerType);
+        return doCreate(readables);
       } catch (final Exception e) {
         throw new RuntimeException(e);
       }
     }
 
-    static ScheduledTaskGroup withoutReadables(final String containerType) {
-      return doCreate(Collections.emptyList(), containerType);
-    }
-  }
-
-  /**
-   * Wrapper for mock {@link ExecutorRepresenter}.
-   */
-  private static final class MockExecutorRepresenterWrapper {
-    private static final AtomicInteger executorIndex = new AtomicInteger(0);
-
-    private final ExecutorRepresenter mockInstance;
-    private final List<ScheduledTaskGroup> scheduledTaskGroups = new ArrayList<>();
-
-    MockExecutorRepresenterWrapper(final String nodeName, final String containerType, final int capacity) {
-      mockInstance = mock(ExecutorRepresenter.class);
-      doAnswer(invocationOnMock -> {
-        final ScheduledTaskGroup scheduledTaskGroup = invocationOnMock.getArgument(0);
-        scheduledTaskGroups.add(scheduledTaskGroup);
-        return null;
-      }).when(mockInstance).onTaskGroupScheduled(any(ScheduledTaskGroup.class));
-      doAnswer(invocationOnMock -> {
-        final String taskGroupId = invocationOnMock.getArgument(0);
-        scheduledTaskGroups.removeIf(scheduledTaskGroup -> scheduledTaskGroup.getTaskGroupId().equals(taskGroupId));
-        return null;
-      }).when(mockInstance).onTaskGroupExecutionComplete(anyString());
-      when(mockInstance.getExecutorId()).thenReturn(String.format("EXECUTOR-%d", executorIndex.getAndIncrement()));
-      when(mockInstance.getNodeName()).thenReturn(nodeName);
-      when(mockInstance.getContainerType()).thenReturn(containerType);
-      doAnswer(invocationOnMock ->
-          scheduledTaskGroups.stream().map(ScheduledTaskGroup::getTaskGroupId).collect(Collectors.toSet()))
-          .when(mockInstance).getRunningTaskGroups();
-      when(mockInstance.getExecutorCapacity()).thenReturn(capacity);
-    }
-
-    void assertScheduledTaskGroups(final List<ScheduledTaskGroup> expected) {
-      assertEquals(expected, scheduledTaskGroups);
-    }
-
-    ExecutorRepresenter get() {
-      return mockInstance;
-    }
-  }
-
-  /**
-   * Wrapper for spied {@link SchedulingPolicy}.
-   * @param <T> the class of the spied instance
-   */
-  private static final class SpiedSchedulingPolicyWrapper<T extends SchedulingPolicy> {
-    private final T spiedInstance;
-    private ScheduledTaskGroup expectedArgument = null;
-
-    SpiedSchedulingPolicyWrapper(final T schedulingPolicy, final JobStateManager jobStateManager) {
-      spiedInstance = spy(schedulingPolicy);
-      doAnswer(invocationOnMock -> {
-        final ScheduledTaskGroup scheduledTaskGroup = invocationOnMock.getArgument(0);
-        assertEquals(expectedArgument, scheduledTaskGroup);
-        expectedArgument = null;
-        jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), TaskGroupState.State.EXECUTING);
-        return true;
-      }).when(spiedInstance).scheduleTaskGroup(any(ScheduledTaskGroup.class), any());
-    }
-
-    /**
-     * Sets expected {@link SchedulingPolicy#scheduleTaskGroup(ScheduledTaskGroup, JobStateManager)} invocation
-     * on this spied object.
-     * @param scheduledTaskGroup expected parameter for the task group to schedule
-     */
-    void expectSchedulingRequest(final ScheduledTaskGroup scheduledTaskGroup) {
-      ensureNoUnresolvedExpectation();
-      this.expectedArgument = scheduledTaskGroup;
-    }
-
-    void ensureNoUnresolvedExpectation() {
-      assertEquals(null, expectedArgument);
-    }
-
-    /**
-     * @return spied instance for {@link SchedulingPolicy}.
-     */
-    T get() {
-      return spiedInstance;
-    }
-  }
-
-  /**
-   * Wrapper for mock {@link JobStateManager} instance.
-   */
-  private static final class MockJobStateManagerWrapper {
-    private final JobStateManager mockInstance;
-    private final Map<String, TaskGroupState.State> taskGroupStates = new HashMap<>();
-
-    MockJobStateManagerWrapper() {
-      mockInstance = mock(JobStateManager.class);
-      doAnswer(invocationOnMock -> {
-        final String taskGroupId = invocationOnMock.getArgument(0);
-        final TaskGroupState.State newState = invocationOnMock.getArgument(1);
-        taskGroupStates.put(taskGroupId, newState);
-        return null;
-      }).when(mockInstance).onTaskGroupStateChanged(anyString(), any(TaskGroupState.State.class));
-    }
-
-    /**
-     * Ensures the TaskGroup state has been changed as expected.
-     * @param taskGroupId id of the TaskGroup
-     * @param state the expected state
-     */
-    void assertTaskGroupState(final String taskGroupId, final TaskGroupState.State state) {
-      assertEquals(state, taskGroupStates.get(taskGroupId));
-    }
-
-    /**
-     * @return mock instance for {@link JobStateManager}.
-     */
-    JobStateManager get() {
-      return mockInstance;
+    static ScheduledTaskGroup withoutReadables() {
+      return doCreate(Collections.emptyList());
     }
   }
 }

-- 
To stop receiving notification emails like this one, please contact
jangho@apache.org.

Mime
View raw message