nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sa...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-42] Make SchedulerRunner reactive, Ensure reverse-topological ordering in scheduling (#11)
Date Mon, 02 Apr 2018 03:02:02 GMT
This is an automated email from the ASF dual-hosted git repository.

sanha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new f03ed05  [NEMO-42] Make SchedulerRunner reactive, Ensure reverse-topological ordering in scheduling (#11)
f03ed05 is described below

commit f03ed05a8e977cd41e8db9bdf7238d1304be5929
Author: JangHo Seo <jangho@jangho.io>
AuthorDate: Mon Apr 2 12:01:59 2018 +0900

    [NEMO-42] Make SchedulerRunner reactive, Ensure reverse-topological ordering in scheduling (#11)
    
    JIRA: [NEMO-42: Make SchedulerRunner reactive, Ensure reverse-topological ordering in scheduling](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-42)
    
    **Major changes:**
    - Removed `SchedulerTimeoutMs` property.
    - Modified SchedulerRunner to
        - check schedulability of all TaskGroups in PendingTaskGroupQueue in one iteration
        - sleep before doing next iteartion if there were no events indicating undiscovered schedulability (ExecutorAdded, TaskGroupCompleted, ...), since the start of current iteration
        - awake from sleep based on events indicating undiscovered schedulability
    - Added `PendingTaskGroupQueue#peekSchedulableTaskGroups` method, which peeks multiple schedulable TaskGroups at once, to enable SchedulerRunner to check multiple TaskGroups in one iteation.
    - Removed awaiting mechanism in SchedulingPolicy implementaitons. Related test codes are also removed.
    - Dropped `PendingTaskGroupQueue#dequeue` and added `PendingTaskGroupQueue#remove` method. This enables SchedulerRunner to remove the scheduled TaskGroup after the scheduling has been succeeded, not before. This fixes a bug where the scheduler often violates reverse-topological stage ordering.
    
    **Minor changes to note:**
    - Fixed a problem that SchedulerRunner didn't schedule TaskGroups with same ScheduleGroup property in reverse-topological order.
    - Modified `sample_executor_resources.json` to execute integration tests with enough resources. Integration tests without enough resources are likely to hang if TaskGroups in one ScheduleGroup are scheduled in reverse-topological order.
    - Added `synchronized` modifier to `MetricManagerWorker#flushMetricMessageQueueToMaster` method, since this method can be invoked by multiple threads (by `scheduledExecutorService` and `MetricManagerWorker#close`) but it is not thread-safe.
    - Renamed `PendingTaskGroupQueue#enqueue` to `PendingTaskGroupQueue#add`
    - Fix wrong assertion of SingleTaskGroupQueueTest
    - Fix SingleTaskGroupQueueTest to rethrow any exceptions in pseudo-scheduler thread
    
    **Tests for the changes:**
    - Existing tests should cover the changes.
    
    **Other comments:**
    - None
    
    resolves [NEMO-42](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-42)
---
 .../main/java/edu/snu/nemo/client/JobLauncher.java |   1 -
 conf/src/main/java/edu/snu/nemo/conf/JobConf.java  |   7 -
 examples/resources/sample_executor_resources.json  |   6 +-
 .../nemo/runtime/executor/MetricManagerWorker.java |   2 +-
 .../nemo/runtime/master/BlockManagerMaster.java    |   2 +-
 .../master/scheduler/BatchSingleJobScheduler.java  |  23 ++-
 ...pQueue.java => PendingTaskGroupCollection.java} |  29 ++-
 .../scheduler/RoundRobinSchedulingPolicy.java      | 155 ++++-----------
 .../nemo/runtime/master/scheduler/Scheduler.java   |   3 +-
 .../runtime/master/scheduler/SchedulerRunner.java  | 119 ++++++++++--
 .../runtime/master/scheduler/SchedulingPolicy.java |   6 -
 .../scheduler/SingleJobTaskGroupCollection.java    | 214 +++++++++++++++++++++
 .../master/scheduler/SingleJobTaskGroupQueue.java  | 212 --------------------
 .../SourceLocationAwareSchedulingPolicy.java       | 151 ++++-----------
 .../snu/nemo/tests/runtime/RuntimeTestUtil.java    |   9 +-
 .../executor/datatransfer/DataTransferTest.java    |   5 +-
 .../scheduler/BatchSingleJobSchedulerTest.java     |  12 +-
 .../master/scheduler/FaultToleranceTest.java       |  39 ++--
 .../scheduler/RoundRobinSchedulingPolicyTest.java  |  88 +--------
 .../master/scheduler/SingleTaskGroupQueueTest.java | 152 ++++-----------
 .../SourceLocationAwareSchedulingPolicyTest.java   |  73 +------
 21 files changed, 503 insertions(+), 805 deletions(-)

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index 916c9e8..4440a86 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -213,7 +213,6 @@ public final class JobLauncher {
     cl.registerShortNameOfClass(JobConf.ExecutorJsonPath.class);
     cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class);
     cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class);
-    cl.registerShortNameOfClass(JobConf.SchedulerTimeoutMs.class);
     cl.registerShortNameOfClass(JobConf.MaxScheduleAttempt.class);
     cl.registerShortNameOfClass(JobConf.FileDirectory.class);
     cl.registerShortNameOfClass(JobConf.GlusterVolumeDirectory.class);
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 255e8bb..9bfc870 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -142,13 +142,6 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
-   * Scheduler timeout in ms.
-   */
-  @NamedParameter(doc = "Scheduler timeout in ms", short_name = "scheduler_timeout_ms", default_value = "50")
-  public final class SchedulerTimeoutMs implements Name<Integer> {
-  }
-
-  /**
    * Max number of attempts for task group scheduling.
    */
   @NamedParameter(doc = "Max number of schedules", short_name = "max_schedule_attempt", default_value = "3")
diff --git a/examples/resources/sample_executor_resources.json b/examples/resources/sample_executor_resources.json
index 5f5b2e0..5765bf3 100644
--- a/examples/resources/sample_executor_resources.json
+++ b/examples/resources/sample_executor_resources.json
@@ -2,16 +2,16 @@
   {
     "type": "Transient",
     "memory_mb": 512,
-    "capacity": 1
+    "capacity": 5
   },
   {
     "type": "Reserved",
     "memory_mb": 512,
-    "capacity": 1
+    "capacity": 5
   },
   {
     "type": "Compute",
     "memory_mb": 512,
-    "capacity": 1
+    "capacity": 5
   }
 ]
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index 7bcb2bc..a6c0bfd 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -53,7 +53,7 @@ public final class MetricManagerWorker implements MetricMessageSender {
                                                       flushingPeriod, TimeUnit.MILLISECONDS);
   }
 
-  private void flushMetricMessageQueueToMaster() {
+  private synchronized void flushMetricMessageQueueToMaster() {
     if (!metricMessageQueue.isEmpty()) {
       // Build batched metric messages
       int size = metricMessageQueue.size();
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 4dc2e4c..7afb373 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -180,7 +180,7 @@ public final class BlockManagerMaster {
   /**
    * To be called when a potential producer task group is scheduled.
    * To be precise, it is called when the task group is enqueued to
-   * {@link edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue}.
+   * {@link edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupCollection}.
    *
    * @param scheduledTaskGroupId the ID of the scheduled task group.
    */
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index f12293f..d579b4d 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -62,7 +62,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    */
   private final SchedulingPolicy schedulingPolicy;
   private final SchedulerRunner schedulerRunner;
-  private final PendingTaskGroupQueue pendingTaskGroupQueue;
+  private final PendingTaskGroupCollection pendingTaskGroupCollection;
 
   /**
    * Other necessary components of this {@link edu.snu.nemo.runtime.master.RuntimeMaster}.
@@ -80,13 +80,13 @@ public final class BatchSingleJobScheduler implements Scheduler {
   @Inject
   public BatchSingleJobScheduler(final SchedulingPolicy schedulingPolicy,
                                  final SchedulerRunner schedulerRunner,
-                                 final PendingTaskGroupQueue pendingTaskGroupQueue,
+                                 final PendingTaskGroupCollection pendingTaskGroupCollection,
                                  final BlockManagerMaster blockManagerMaster,
                                  final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
                                  final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler) {
     this.schedulingPolicy = schedulingPolicy;
     this.schedulerRunner = schedulerRunner;
-    this.pendingTaskGroupQueue = pendingTaskGroupQueue;
+    this.pendingTaskGroupCollection = pendingTaskGroupCollection;
     this.blockManagerMaster = blockManagerMaster;
     this.pubSubEventHandlerWrapper = pubSubEventHandlerWrapper;
     updatePhysicalPlanEventHandler.setScheduler(this);
@@ -107,7 +107,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     this.jobStateManager = scheduledJobStateManager;
 
     schedulerRunner.scheduleJob(scheduledJobStateManager);
-    pendingTaskGroupQueue.onJobScheduled(physicalPlan);
+    pendingTaskGroupCollection.onJobScheduled(physicalPlan);
 
     LOG.info("Job to schedule: {}", jobToSchedule.getId());
 
@@ -167,6 +167,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
     schedulingPolicy.onExecutorAdded(executorRepresenter);
+    schedulerRunner.onAnExecutorAvailable();
   }
 
   @Override
@@ -195,7 +196,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
   @Override
   public void terminate() {
     this.schedulerRunner.terminate();
-    this.pendingTaskGroupQueue.close();
+    this.pendingTaskGroupCollection.close();
   }
 
   /**
@@ -229,7 +230,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
   }
 
   /**
-   * Selects the list of stages to schedule, in the order they must be added to {@link PendingTaskGroupQueue}.
+   * Selects the list of stages to schedule, in the order they must be added to {@link PendingTaskGroupCollection}.
    *
    * This is a recursive function that decides which schedule group to schedule upon a stage completion, or a failure.
    * It takes the currentScheduleGroupIndex as a reference point to begin looking for the stages to execute:
@@ -245,7 +246,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * @param currentScheduleGroupIndex
    *      the index of the schedule group that is executing/has executed when this method is called.
    * @return an optional of the (possibly empty) list of next schedulable stages, in the order they should be
-   * enqueued to {@link PendingTaskGroupQueue}.
+   * enqueued to {@link PendingTaskGroupCollection}.
    */
   private Optional<List<PhysicalStage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
     if (currentScheduleGroupIndex > initialScheduleGroup) {
@@ -376,10 +377,11 @@ public final class BatchSingleJobScheduler implements Scheduler {
       blockManagerMaster.onProducerTaskGroupScheduled(taskGroupId);
       final int taskGroupIdx = RuntimeIdGenerator.getIndexFromTaskGroupId(taskGroupId);
       LOG.debug("Enquing {}", taskGroupId);
-      pendingTaskGroupQueue.enqueue(new ScheduledTaskGroup(physicalPlan.getId(),
+      pendingTaskGroupCollection.add(new ScheduledTaskGroup(physicalPlan.getId(),
           stageToSchedule.getSerializedTaskGroupDag(), taskGroupId, stageIncomingEdges, stageOutgoingEdges, attemptIdx,
           stageToSchedule.getContainerType(), logicalTaskIdToReadables.get(taskGroupIdx)));
     });
+    schedulerRunner.onATaskGroupAvailable();
   }
 
   /**
@@ -438,6 +440,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
         scheduleNextStage(stageIdForTaskGroupUponCompletion);
       }
     }
+    schedulerRunner.onAnExecutorAvailable();
   }
 
   /**
@@ -475,6 +478,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     } else {
       onTaskGroupExecutionComplete(executorId, taskGroupId, true);
     }
+    schedulerRunner.onAnExecutorAvailable();
   }
 
   /**
@@ -504,7 +508,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
           for (final PhysicalStage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
             if (stage.getId().equals(stageId)) {
               LOG.info("Removing TaskGroups for {} before they are scheduled to an executor", stage.getId());
-              pendingTaskGroupQueue.removeTaskGroupsAndDescendants(stage.getId());
+              pendingTaskGroupCollection.removeTaskGroupsAndDescendants(stage.getId());
               stage.getTaskGroupIds().forEach(dstTaskGroupId -> {
                 if (jobStateManager.getTaskGroupState(dstTaskGroupId).getStateMachine().getCurrentState()
                     != TaskGroupState.State.COMPLETE) {
@@ -542,5 +546,6 @@ public final class BatchSingleJobScheduler implements Scheduler {
       default:
         throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
     }
+    schedulerRunner.onAnExecutorAvailable();
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupQueue.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupCollection.java
similarity index 65%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupQueue.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupCollection.java
index 5b59de4..db3a084 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupQueue.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupCollection.java
@@ -22,6 +22,8 @@ import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
+import java.util.Collection;
+import java.util.NoSuchElementException;
 import java.util.Optional;
 
 /**
@@ -31,21 +33,30 @@ import java.util.Optional;
  */
 @ThreadSafe
 @DriverSide
-@DefaultImplementation(SingleJobTaskGroupQueue.class)
-public interface PendingTaskGroupQueue {
+@DefaultImplementation(SingleJobTaskGroupCollection.class)
+public interface PendingTaskGroupCollection {
 
   /**
-   * Enqueues a TaskGroup to this PQ.
-   * @param scheduledTaskGroup to enqueue.
+   * Adds a TaskGroup to this collection.
+   * @param scheduledTaskGroup to add.
    */
-  void enqueue(final ScheduledTaskGroup scheduledTaskGroup);
+  void add(final ScheduledTaskGroup scheduledTaskGroup);
 
   /**
-   * Dequeues the next TaskGroup to be scheduled.
-   * @return an optional of the the next TaskGroup to be scheduled,
-   * an empty optional if no such TaskGroup exists.
+   * Removes the specified TaskGroup to be scheduled.
+   * @param taskGroupId id of the TaskGroup
+   * @return the specified TaskGroup
+   * @throws NoSuchElementException if the specified TaskGroup is not in the queue,
+   *                                or removing this TaskGroup breaks scheduling order
    */
-  Optional<ScheduledTaskGroup> dequeue();
+  ScheduledTaskGroup remove(final String taskGroupId) throws NoSuchElementException;
+
+  /**
+   * Peeks TaskGroups that can be scheduled according to job dependency priority.
+   * Changes to the queue must not reflected to the returned collection to avoid concurrent modification.
+   * @return TaskGroups that can be scheduled, or {@link Optional#empty()} if the queue is empty
+   */
+  Optional<Collection<ScheduledTaskGroup>> peekSchedulableTaskGroups();
 
   /**
    * Registers a job to this queue in case the queue needs to understand the topology of the job DAG.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index 0db67ad..fa5f647 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,23 +16,16 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.common.exception.SchedulingException;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
-import org.apache.reef.tang.annotations.Parameter;
 
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.stream.Collectors;
@@ -49,23 +42,9 @@ import java.util.stream.Collectors;
 public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
   private static final Logger LOG = LoggerFactory.getLogger(RoundRobinSchedulingPolicy.class.getName());
 
-  private final int scheduleTimeoutMs;
-
   private final ExecutorRegistry executorRegistry;
 
   /**
-   * Thread safety is provided by this lock as multiple threads can call the methods in this class concurrently.
-   */
-  private final Lock lock;
-
-  /**
-   * Executor allocation is achieved by putting conditions for each container type.
-   * The condition blocks when there is no executor of the container type available,
-   * and is released when such an executor becomes available (either by an extra executor, or a task group completion).
-   */
-  private final Map<String, Condition> conditionByContainerType;
-
-  /**
    * The pool of executors available for each container type.
    */
   private final Map<String, List<String>> executorIdByContainerType;
@@ -78,129 +57,73 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
 
   @Inject
   @VisibleForTesting
-  public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry,
-                                    @Parameter(JobConf.SchedulerTimeoutMs.class) final int scheduleTimeoutMs) {
-    this.scheduleTimeoutMs = scheduleTimeoutMs;
+  public RoundRobinSchedulingPolicy(final ExecutorRegistry executorRegistry) {
     this.executorRegistry = executorRegistry;
-    this.lock = new ReentrantLock();
     this.executorIdByContainerType = new HashMap<>();
-    this.conditionByContainerType = new HashMap<>();
     this.nextExecutorIndexByContainerType = new HashMap<>();
     initializeContainerTypeIfAbsent(ExecutorPlacementProperty.NONE); // Need this to avoid potential null errors
   }
 
-  public long getScheduleTimeoutMs() {
-    return scheduleTimeoutMs;
-  }
-
   @Override
   public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
                                    final JobStateManager jobStateManager) {
-    lock.lock();
-    try {
-      final String containerType = scheduledTaskGroup.getContainerType();
-      initializeContainerTypeIfAbsent(containerType);
-
-      Optional<String> executorId = selectExecutorByRR(containerType);
-      if (!executorId.isPresent()) { // If there is no available executor to schedule this task group now,
-        // TODO #696 Sleep Time Per Container Type in Scheduling Policy
-        final boolean executorAvailable =
-            conditionByContainerType.get(containerType).await(scheduleTimeoutMs, TimeUnit.MILLISECONDS);
-        if (executorAvailable) { // if an executor has become available before scheduleTimeoutMs,
-          executorId = selectExecutorByRR(containerType);
-          if (executorId.isPresent()) {
-            scheduleTaskGroup(selectExecutorByRR(containerType).get(), scheduledTaskGroup, jobStateManager);
-            return true;
-          } else {
-            throw new SchedulingException(new Throwable("An executor must be available at this point"));
-          }
-        }
-        return false;
-      } else {
-        scheduleTaskGroup(executorId.get(), scheduledTaskGroup, jobStateManager);
-        return true;
-      }
-    } catch (final Exception e) {
-      throw new SchedulingException(e);
-    } finally {
-      lock.unlock();
+    final String containerType = scheduledTaskGroup.getContainerType();
+    initializeContainerTypeIfAbsent(containerType);
+
+    Optional<String> executorId = selectExecutorByRR(containerType);
+    if (!executorId.isPresent()) { // If there is no available executor to schedule this task group now,
+      return false;
+    } else {
+      scheduleTaskGroup(executorId.get(), scheduledTaskGroup, jobStateManager);
+      return true;
     }
   }
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executor) {
-    lock.lock();
-    try {
-      executorRegistry.registerRepresenter(executor);
-      final String containerType = executor.getContainerType();
-      initializeContainerTypeIfAbsent(containerType);
+    executorRegistry.registerRepresenter(executor);
+    final String containerType = executor.getContainerType();
+    initializeContainerTypeIfAbsent(containerType);
 
-      executorIdByContainerType.get(containerType)
-          .add(nextExecutorIndexByContainerType.get(containerType), executor.getExecutorId());
-      signalPossiblyWaitingScheduler(containerType);
-    } finally {
-      lock.unlock();
-    }
+    executorIdByContainerType.get(containerType)
+        .add(nextExecutorIndexByContainerType.get(containerType), executor.getExecutorId());
   }
 
   @Override
   public Set<String> onExecutorRemoved(final String executorId) {
-    lock.lock();
-    try {
-      executorRegistry.setRepresenterAsFailed(executorId);
-      final ExecutorRepresenter executor = executorRegistry.getFailedExecutorRepresenter(executorId);
-      executor.onExecutorFailed();
+    executorRegistry.setRepresenterAsFailed(executorId);
+    final ExecutorRepresenter executor = executorRegistry.getFailedExecutorRepresenter(executorId);
+    executor.onExecutorFailed();
 
-      final String containerType = executor.getContainerType();
+    final String containerType = executor.getContainerType();
 
-      final List<String> executorIdList = executorIdByContainerType.get(containerType);
-      int nextExecutorIndex = nextExecutorIndexByContainerType.get(containerType);
-
-      final int executorAssignmentLocation = executorIdList.indexOf(executorId);
-      if (executorAssignmentLocation < nextExecutorIndex) {
-        nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex - 1);
-      } else if (executorAssignmentLocation == nextExecutorIndex) {
-        nextExecutorIndexByContainerType.put(containerType, 0);
-      }
-      executorIdList.remove(executorId);
+    final List<String> executorIdList = executorIdByContainerType.get(containerType);
+    int nextExecutorIndex = nextExecutorIndexByContainerType.get(containerType);
 
-      return Collections.unmodifiableSet(executor.getFailedTaskGroups());
-    } finally {
-      lock.unlock();
+    final int executorAssignmentLocation = executorIdList.indexOf(executorId);
+    if (executorAssignmentLocation < nextExecutorIndex) {
+      nextExecutorIndexByContainerType.put(containerType, nextExecutorIndex - 1);
+    } else if (executorAssignmentLocation == nextExecutorIndex) {
+      nextExecutorIndexByContainerType.put(containerType, 0);
     }
+    executorIdList.remove(executorId);
+
+    return Collections.unmodifiableSet(executor.getFailedTaskGroups());
   }
 
   @Override
   public void onTaskGroupExecutionComplete(final String executorId, final String taskGroupId) {
-    lock.lock();
-    try {
-      final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId);
-      executor.onTaskGroupExecutionComplete(taskGroupId);
-      LOG.info("{" + taskGroupId + "} completed in [" + executorId + "]");
-
-      // the scheduler thread may be waiting for a free slot...
-      final String containerType = executor.getContainerType();
-      signalPossiblyWaitingScheduler(containerType);
-    } finally {
-      lock.unlock();
-    }
+    final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId);
+    executor.onTaskGroupExecutionComplete(taskGroupId);
+    LOG.info("{" + taskGroupId + "} completed in [" + executorId + "]");
   }
 
   @Override
   public void onTaskGroupExecutionFailed(final String executorId, final String taskGroupId) {
-    lock.lock();
-    try {
-      final ExecutorRepresenter executor = executorRegistry.getExecutorRepresenter(executorId);
-
-      executor.onTaskGroupExecutionFailed(taskGroupId);
-      LOG.info("{" + taskGroupId + "} failed in [" + executorId + "]");
+    final ExecutorRepresenter executor = executorRegistry.getExecutorRepresenter(executorId);
 
-      // the scheduler thread may be waiting for a free slot...
-      final String containerType = executor.getContainerType();
-      signalPossiblyWaitingScheduler(containerType);
-    } finally {
-      lock.unlock();
-    }
+    executor.onTaskGroupExecutionFailed(taskGroupId);
+    LOG.info("{" + taskGroupId + "} failed in [" + executorId + "]");
   }
 
   @Override
@@ -276,13 +199,5 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
   private void initializeContainerTypeIfAbsent(final String containerType) {
     executorIdByContainerType.putIfAbsent(containerType, new ArrayList<>());
     nextExecutorIndexByContainerType.putIfAbsent(containerType, 0);
-    conditionByContainerType.putIfAbsent(containerType, lock.newCondition());
-  }
-
-  private void signalPossiblyWaitingScheduler(final String typeOfContainerWithNewFreeSlot) {
-    conditionByContainerType.get(typeOfContainerWithNewFreeSlot).signal();
-    if (!typeOfContainerWithNewFreeSlot.equals(ExecutorPlacementProperty.NONE)) {
-      conditionByContainerType.get(ExecutorPlacementProperty.NONE).signal();
-    }
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index 65f44dc..deef9d8 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -27,7 +27,8 @@ import javax.annotation.Nullable;
 
 /**
  * Only two threads call scheduling code: RuntimeMaster thread (RMT), and SchedulerThread(ST).
- * RMT and ST meet only at two points: SchedulingPolicy, and PendingTaskGroupQueue, which are synchronized(ThreadSafe).
+ * RMT and ST meet only at two points: SchedulingPolicy, and PendingTaskGroupCollection,
+ * which are synchronized(ThreadSafe).
  * Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
  *
  * Receives jobs to execute and schedules
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index c6079d6..6806d74 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -23,6 +23,10 @@ import org.apache.reef.annotations.audience.DriverSide;
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,16 +43,18 @@ public final class SchedulerRunner {
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerRunner.class.getName());
   private final Map<String, JobStateManager> jobStateManagers;
   private final SchedulingPolicy schedulingPolicy;
-  private final PendingTaskGroupQueue pendingTaskGroupQueue;
+  private final PendingTaskGroupCollection pendingTaskGroupCollection;
   private final ExecutorService schedulerThread;
   private boolean initialJobScheduled;
   private boolean isTerminated;
+  private final DelayedSignalingCondition mustCheckSchedulingAvailabilityOrSchedulerTerminated
+      = new DelayedSignalingCondition();
 
   @Inject
   public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
-                         final PendingTaskGroupQueue pendingTaskGroupQueue) {
+                         final PendingTaskGroupCollection pendingTaskGroupCollection) {
     this.jobStateManagers = new HashMap<>();
-    this.pendingTaskGroupQueue = pendingTaskGroupQueue;
+    this.pendingTaskGroupCollection = pendingTaskGroupCollection;
     this.schedulingPolicy = schedulingPolicy;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "SchedulerRunner"));
     this.initialJobScheduled = false;
@@ -56,6 +62,20 @@ public final class SchedulerRunner {
   }
 
   /**
+   * Signals to the condition on executor availability.
+   */
+  public void onAnExecutorAvailable() {
+    mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
+  }
+
+  /**
+   * Signals to the condition on TaskGroup availability.
+   */
+  public void onATaskGroupAvailable() {
+    mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
+  }
+
+  /**
    * Begin scheduling a job.
    * @param jobStateManager the corresponding {@link JobStateManager}
    */
@@ -74,6 +94,7 @@ public final class SchedulerRunner {
   void terminate() {
     schedulingPolicy.terminate();
     isTerminated = true;
+    mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
   }
 
   /**
@@ -82,28 +103,43 @@ public final class SchedulerRunner {
   private final class SchedulerThread implements Runnable {
     @Override
     public void run() {
-      while (!isTerminated) {
-        try {
-          Optional<ScheduledTaskGroup> nextTaskGroupToSchedule;
-          do {
-            nextTaskGroupToSchedule = pendingTaskGroupQueue.dequeue();
-          } while (!nextTaskGroupToSchedule.isPresent());
+      // Run the first iteration unconditionally
+      mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
 
-          final JobStateManager jobStateManager = jobStateManagers.get(nextTaskGroupToSchedule.get().getJobId());
-          final boolean isScheduled =
-              schedulingPolicy.scheduleTaskGroup(nextTaskGroupToSchedule.get(), jobStateManager);
+      while (!isTerminated) {
+        // Iteration guard
+        mustCheckSchedulingAvailabilityOrSchedulerTerminated.await();
 
-          if (!isScheduled) {
-            LOG.info("Failed to assign an executor for {} before the timeout: {}",
-                new Object[]{nextTaskGroupToSchedule.get().getTaskGroupId(),
-                    schedulingPolicy.getScheduleTimeoutMs()});
+        final Collection<ScheduledTaskGroup> schedulableTaskGroups = pendingTaskGroupCollection
+            .peekSchedulableTaskGroups().orElse(null);
+        if (schedulableTaskGroups == null) {
+          // TaskGroup queue is empty
+          LOG.debug("PendingTaskGroupCollection is empty. Awaiting for more TaskGroups...");
+          continue;
+        }
 
-            // Put this TaskGroup back to the queue since we failed to schedule it.
-            pendingTaskGroupQueue.enqueue(nextTaskGroupToSchedule.get());
+        int numScheduledTaskGroups = 0;
+        for (final ScheduledTaskGroup schedulableTaskGroup : schedulableTaskGroups) {
+          final JobStateManager jobStateManager = jobStateManagers.get(schedulableTaskGroup.getJobId());
+          LOG.debug("Trying to schedule {}...", schedulableTaskGroup.getTaskGroupId());
+          final boolean isScheduled =
+              schedulingPolicy.scheduleTaskGroup(schedulableTaskGroup, jobStateManager);
+          if (isScheduled) {
+            LOG.debug("Successfully scheduled {}", schedulableTaskGroup.getTaskGroupId());
+            pendingTaskGroupCollection.remove(schedulableTaskGroup.getTaskGroupId());
+            numScheduledTaskGroups++;
+          } else {
+            LOG.debug("Failed to schedule {}", schedulableTaskGroup.getTaskGroupId());
           }
-        } catch (final Exception e) {
-          e.printStackTrace();
-          throw e;
+        }
+
+        LOG.debug("Examined {} TaskGroups, scheduled {} TaskGroups",
+            schedulableTaskGroups.size(), numScheduledTaskGroups);
+        if (schedulableTaskGroups.size() == numScheduledTaskGroups) {
+          // Scheduled all TaskGroups in the stage
+          // Immediately run next iteration to check whether there is another schedulable stage
+          LOG.debug("Trying to schedule next Stage in the ScheduleGroup (if any)...");
+          mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
         }
       }
       jobStateManagers.values().forEach(jobStateManager -> {
@@ -116,4 +152,45 @@ public final class SchedulerRunner {
       LOG.info("SchedulerRunner Terminated!");
     }
   }
+
+  /**
+   * A {@link Condition} that allows 'delayed' signaling.
+   */
+  private final class DelayedSignalingCondition {
+    private final AtomicBoolean hasDelayedSignal = new AtomicBoolean(false);
+    private final Lock lock = new ReentrantLock();
+    private final Condition condition = lock.newCondition();
+
+    /**
+     * Signals to this condition. If no thread is awaiting for this condition,
+     * signaling is delayed until the first next {@link #await} invocation.
+     */
+    public void signal() {
+      lock.lock();
+      try {
+        hasDelayedSignal.set(true);
+        condition.signal();
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    /**
+     * Awaits to this condition. The thread will awake when there is a delayed signal,
+     * or the next first {@link #signal} invocation.
+     */
+    public void await() {
+      lock.lock();
+      try {
+        if (!hasDelayedSignal.get()) {
+          condition.await();
+        }
+        hasDelayedSignal.set(false);
+      } catch (final InterruptedException e) {
+        throw new RuntimeException(e);
+      } finally {
+        lock.unlock();
+      }
+    }
+  }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index 59f9adf..061adae 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -33,12 +33,6 @@ import java.util.Set;
 public interface SchedulingPolicy {
 
   /**
-   * Returns this scheduling policy's timeout before an executor assignment.
-   * @return the timeout in milliseconds.
-   */
-  long getScheduleTimeoutMs();
-
-  /**
    * Attempts to schedule the given taskGroup to an executor according to this policy.
    * If there is no executor available for the taskGroup, it waits for an executor to be assigned before it times out.
    * (Depending on the executor's resource type)
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
new file mode 100644
index 0000000..51c5db7
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.scheduler;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
+import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import net.jcip.annotations.ThreadSafe;
+import org.apache.reef.annotations.audience.DriverSide;
+
+import javax.inject.Inject;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * {@link PendingTaskGroupCollection} implementation.
+ * This class provides two-level scheduling by keeping track of schedulable stages and stage-TaskGroup membership.
+ * {@link #peekSchedulableTaskGroups()} returns collection of TaskGroups which belong to one of the schedulable stages.
+ */
+@ThreadSafe
+@DriverSide
+public final class SingleJobTaskGroupCollection implements PendingTaskGroupCollection {
+  private PhysicalPlan physicalPlan;
+
+  /**
+   * Pending TaskGroups awaiting to be scheduled for each stage.
+   */
+  private final ConcurrentMap<String, Map<String, ScheduledTaskGroup>> stageIdToPendingTaskGroups;
+
+  /**
+   * Stages with TaskGroups that have not yet been scheduled.
+   */
+  private final BlockingDeque<String> schedulableStages;
+
+  @Inject
+  public SingleJobTaskGroupCollection() {
+    stageIdToPendingTaskGroups = new ConcurrentHashMap<>();
+    schedulableStages = new LinkedBlockingDeque<>();
+  }
+
+  @Override
+  public synchronized void add(final ScheduledTaskGroup scheduledTaskGroup) {
+    final String stageId = RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId());
+
+    stageIdToPendingTaskGroups.compute(stageId, (s, taskGroupIdToTaskGroup) -> {
+      if (taskGroupIdToTaskGroup == null) {
+        final Map<String, ScheduledTaskGroup> taskGroupIdToTaskGroupMap = new HashMap<>();
+        taskGroupIdToTaskGroupMap.put(scheduledTaskGroup.getTaskGroupId(), scheduledTaskGroup);
+        updateSchedulableStages(stageId, scheduledTaskGroup.getContainerType());
+        return taskGroupIdToTaskGroupMap;
+      } else {
+        taskGroupIdToTaskGroup.put(scheduledTaskGroup.getTaskGroupId(), scheduledTaskGroup);
+        return taskGroupIdToTaskGroup;
+      }
+    });
+  }
+
+  /**
+   * Removes the specified TaskGroup to be scheduled.
+   * The specified TaskGroup should belong to the collection from {@link #peekSchedulableTaskGroups()}.
+   * @param taskGroupId id of the TaskGroup
+   * @return the specified TaskGroup
+   * @throws NoSuchElementException if the specified TaskGroup is not in the queue,
+   *                                or removing this TaskGroup breaks scheduling order
+   *                                (i.e. does not belong to the collection from {@link #peekSchedulableTaskGroups()}.
+   */
+  @Override
+  public synchronized ScheduledTaskGroup remove(final String taskGroupId) throws NoSuchElementException {
+    final String stageId = schedulableStages.peekFirst();
+    if (stageId == null) {
+      throw new NoSuchElementException("No schedulable stage in TaskGroup queue");
+    }
+
+    final Map<String, ScheduledTaskGroup> pendingTaskGroupsForStage = stageIdToPendingTaskGroups.get(stageId);
+
+    if (pendingTaskGroupsForStage == null) {
+      throw new RuntimeException(String.format("Stage %s not found in TaskGroup queue", stageId));
+    }
+    final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupsForStage.remove(taskGroupId);
+    if (taskGroupToSchedule == null) {
+      throw new NoSuchElementException(String.format("TaskGroup %s not found in TaskGroup queue", taskGroupId));
+    }
+    if (pendingTaskGroupsForStage.isEmpty()) {
+      if (!schedulableStages.pollFirst().equals(stageId)) {
+        throw new RuntimeException(String.format("Expected stage %s to be polled", stageId));
+      }
+      stageIdToPendingTaskGroups.remove(stageId);
+      stageIdToPendingTaskGroups.forEach((scheduledStageId, taskGroups) ->
+          updateSchedulableStages(scheduledStageId, taskGroups.values().iterator().next().getContainerType()));
+    }
+
+    return taskGroupToSchedule;
+  }
+
+  /**
+   * Peeks TaskGroups that can be scheduled.
+   * @return TaskGroups to be scheduled, or {@link Optional#empty()} if the queue is empty
+   * @return collection of TaskGroups which belong to one of the schedulable stages
+   *         or {@link Optional#empty} if the queue is empty
+   */
+  @Override
+  public synchronized Optional<Collection<ScheduledTaskGroup>> peekSchedulableTaskGroups() {
+    final String stageId = schedulableStages.peekFirst();
+    if (stageId == null) {
+      return Optional.empty();
+    }
+
+    final Map<String, ScheduledTaskGroup> pendingTaskGroupsForStage = stageIdToPendingTaskGroups.get(stageId);
+    if (pendingTaskGroupsForStage == null) {
+      throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTaskGroups map", stageId));
+    }
+    return Optional.of(new ArrayList<>(pendingTaskGroupsForStage.values()));
+  }
+
+  /**
+   * Removes a stage and its descendant stages from this PQ.
+   * @param stageId for the stage to begin the removal recursively.
+   */
+  @Override
+  public synchronized void removeTaskGroupsAndDescendants(final String stageId) {
+    removeStageAndChildren(stageId);
+  }
+
+  /**
+   * Recursively removes a stage and its children stages from this PQ.
+   * @param stageId for the stage to begin the removal recursively.
+   */
+  private synchronized void removeStageAndChildren(final String stageId) {
+    schedulableStages.remove(stageId);
+    stageIdToPendingTaskGroups.remove(stageId);
+
+    physicalPlan.getStageDAG().getChildren(stageId).forEach(
+        physicalStage -> removeStageAndChildren(physicalStage.getId()));
+  }
+
+  /**
+   * Updates the two-level data structure by examining a new candidate stage.
+   * If there are no stages with higher priority, the candidate can be made schedulable.
+   *
+   * NOTE: This method provides the "line up" between stages, by assigning priorities,
+   * serving as the key to the "priority" implementation of this class.
+   * @param candidateStageId for the stage that can potentially be scheduled.
+   * @param candidateStageContainerType for the stage that can potentially be scheduled.
+   */
+  private synchronized void updateSchedulableStages(
+      final String candidateStageId, final String candidateStageContainerType) {
+    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
+
+    if (isSchedulable(candidateStageId, candidateStageContainerType)) {
+      // Check for ancestor stages that became schedulable due to candidateStage's absence from the queue.
+      jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
+        if (schedulableStages.contains(ancestorStage.getId())) {
+          // Remove the ancestor stage if it is of the same container type.
+          if (candidateStageContainerType.equals(ancestorStage.getContainerType())) {
+            schedulableStages.remove(ancestorStage.getId());
+          }
+        }
+      });
+      if (!schedulableStages.contains(candidateStageId)) {
+        schedulableStages.addLast(candidateStageId);
+      }
+    }
+  }
+
+  /**
+   * Determines whether the given candidate stage is schedulable immediately or not.
+   * @param candidateStageId for the stage that can potentially be scheduled.
+   * @param candidateStageContainerType for the stage that can potentially be scheduled.
+   * @return true if schedulable, false otherwise.
+   */
+  private synchronized boolean isSchedulable(final String candidateStageId, final String candidateStageContainerType) {
+    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
+    for (final PhysicalStage descendantStage : jobDAG.getDescendants(candidateStageId)) {
+      if (schedulableStages.contains(descendantStage.getId())) {
+        if (candidateStageContainerType.equals(descendantStage.getContainerType())) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public synchronized void onJobScheduled(final PhysicalPlan physicalPlanForJob) {
+    this.physicalPlan = physicalPlanForJob;
+  }
+
+  @Override
+  public synchronized boolean isEmpty() {
+    return schedulableStages.isEmpty();
+  }
+
+  @Override
+  public synchronized void close() {
+    schedulableStages.clear();
+    stageIdToPendingTaskGroups.clear();
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupQueue.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupQueue.java
deleted file mode 100644
index c605390..0000000
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupQueue.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.master.scheduler;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.exception.SchedulingException;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
-import net.jcip.annotations.ThreadSafe;
-import org.apache.reef.annotations.audience.DriverSide;
-
-import javax.inject.Inject;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.function.BiFunction;
-
-/**
- * Keep tracks of all pending task groups.
- * This class provides two-level queue scheduling by prioritizing TaskGroups of certain stages to be scheduled first.
- * Stages that are mutually independent alternate turns in scheduling each of their TaskGroups.
- * This PQ assumes that stages/task groups of higher priorities are never enqueued without first removing
- * those of lower priorities (which is how Scheduler behaves) for simplicity.
- */
-@ThreadSafe
-@DriverSide
-public final class SingleJobTaskGroupQueue implements PendingTaskGroupQueue {
-  private PhysicalPlan physicalPlan;
-
-  /**
-   * Pending TaskGroups awaiting to be scheduled for each stage.
-   */
-  private final ConcurrentMap<String, Deque<ScheduledTaskGroup>> stageIdToPendingTaskGroups;
-
-  /**
-   * Stages with TaskGroups that have not yet been scheduled.
-   */
-  private final BlockingDeque<String> schedulableStages;
-
-  @Inject
-  public SingleJobTaskGroupQueue() {
-    stageIdToPendingTaskGroups = new ConcurrentHashMap<>();
-    schedulableStages = new LinkedBlockingDeque<>();
-  }
-
-  @Override
-  public void enqueue(final ScheduledTaskGroup scheduledTaskGroup) {
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId());
-
-    synchronized (stageIdToPendingTaskGroups) {
-      stageIdToPendingTaskGroups.compute(stageId,
-          new BiFunction<String, Deque<ScheduledTaskGroup>, Deque<ScheduledTaskGroup>>() {
-            @Override
-            public Deque<ScheduledTaskGroup> apply(final String s,
-                                                   final Deque<ScheduledTaskGroup> scheduledTaskGroups) {
-              if (scheduledTaskGroups == null) {
-                final Deque<ScheduledTaskGroup> pendingTaskGroupsForStage = new ArrayDeque<>();
-                pendingTaskGroupsForStage.add(scheduledTaskGroup);
-                updateSchedulableStages(stageId, scheduledTaskGroup.getContainerType());
-                return pendingTaskGroupsForStage;
-              } else {
-                scheduledTaskGroups.add(scheduledTaskGroup);
-                return scheduledTaskGroups;
-              }
-            }
-          });
-    }
-  }
-
-  /**
-   * Dequeues the next TaskGroup to be scheduled according to job dependency priority.
-   * @return the next TaskGroup to be scheduled
-   */
-  @Override
-  public Optional<ScheduledTaskGroup> dequeue() {
-    ScheduledTaskGroup taskGroupToSchedule = null;
-    final String stageId;
-    try {
-      stageId = schedulableStages.takeFirst();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-      throw new SchedulingException(new Throwable("An exception occurred while trying to dequeue the next TaskGroup"));
-    }
-
-    synchronized (stageIdToPendingTaskGroups) {
-      final Deque<ScheduledTaskGroup> pendingTaskGroupsForStage = stageIdToPendingTaskGroups.get(stageId);
-
-      if (pendingTaskGroupsForStage == null) {
-        schedulableStages.addLast(stageId);
-      } else {
-        taskGroupToSchedule = pendingTaskGroupsForStage.poll();
-        if (pendingTaskGroupsForStage.isEmpty()) {
-          stageIdToPendingTaskGroups.remove(stageId);
-          stageIdToPendingTaskGroups.forEach((scheduledStageId, taskGroupList) ->
-              updateSchedulableStages(scheduledStageId, taskGroupList.getFirst().getContainerType()));
-        } else {
-          schedulableStages.addLast(stageId);
-        }
-      }
-    }
-
-    return (taskGroupToSchedule == null) ? Optional.empty()
-        : Optional.of(taskGroupToSchedule);
-  }
-
-  /**
-   * Removes a stage and its descendant stages from this PQ.
-   * @param stageId for the stage to begin the removal recursively.
-   */
-  @Override
-  public void removeTaskGroupsAndDescendants(final String stageId) {
-    synchronized (stageIdToPendingTaskGroups) {
-      removeStageAndChildren(stageId);
-    }
-  }
-
-  /**
-   * Recursively removes a stage and its children stages from this PQ.
-   * @param stageId for the stage to begin the removal recursively.
-   */
-  private void removeStageAndChildren(final String stageId) {
-    schedulableStages.remove(stageId);
-    stageIdToPendingTaskGroups.remove(stageId);
-
-    physicalPlan.getStageDAG().getChildren(stageId).forEach(
-        physicalStage -> removeStageAndChildren(physicalStage.getId()));
-  }
-
-  /**
-   * Updates the two-level PQ by examining a new candidate stage.
-   * If there are no stages with higher priority, the candidate can be made schedulable.
-   *
-   * NOTE: This method provides the "line up" between stages, by assigning priorities,
-   * serving as the key to the "priority" implementation of this class.
-   * @param candidateStageId for the stage that can potentially be scheduled.
-   * @param candidateStageContainerType for the stage that can potentially be scheduled.
-   */
-  private void updateSchedulableStages(final String candidateStageId, final String candidateStageContainerType) {
-    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
-
-    if (isSchedulable(candidateStageId, candidateStageContainerType)) {
-      // Check for ancestor stages that became schedulable due to candidateStage's absence from the queue.
-      jobDAG.getAncestors(candidateStageId).forEach(ancestorStage -> {
-        if (schedulableStages.contains(ancestorStage.getId())) {
-          // Remove the ancestor stage if it is of the same container type.
-          if (candidateStageContainerType.equals(ancestorStage.getContainerType())) {
-            schedulableStages.remove(ancestorStage.getId());
-          }
-        }
-      });
-      if (!schedulableStages.contains(candidateStageId)) {
-        schedulableStages.addLast(candidateStageId);
-      }
-    }
-  }
-
-  /**
-   * Determines whether the given candidate stage is schedulable immediately or not.
-   * @param candidateStageId for the stage that can potentially be scheduled.
-   * @param candidateStageContainerType for the stage that can potentially be scheduled.
-   * @return true if schedulable, false otherwise.
-   */
-  private boolean isSchedulable(final String candidateStageId, final String candidateStageContainerType) {
-    final DAG<PhysicalStage, PhysicalStageEdge> jobDAG = physicalPlan.getStageDAG();
-    for (final PhysicalStage descendantStage : jobDAG.getDescendants(candidateStageId)) {
-      if (schedulableStages.contains(descendantStage.getId())) {
-        if (candidateStageContainerType.equals(descendantStage.getContainerType())) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public void onJobScheduled(final PhysicalPlan physicalPlanForJob) {
-    this.physicalPlan = physicalPlanForJob;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    synchronized (stageIdToPendingTaskGroups) {
-      for (final String stageId : schedulableStages) {
-        if (!stageIdToPendingTaskGroups.get(stageId).isEmpty()) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  @Override
-  public void close() {
-    schedulableStages.clear();
-    stageIdToPendingTaskGroups.clear();
-  }
-}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index e02c259..9c69253 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.common.exception.SchedulingException;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
 import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
@@ -30,9 +29,6 @@ import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -48,9 +44,6 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
 
   private final ExecutorRegistry executorRegistry;
   private final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy;
-  private final long scheduleTimeoutMs;
-  private final Lock lock = new ReentrantLock();
-  private final Condition moreExecutorsAvailableCondition = lock.newCondition();
 
   /**
    * Injectable constructor for {@link SourceLocationAwareSchedulingPolicy}.
@@ -62,12 +55,6 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
                                               final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy) {
     this.executorRegistry = executorRegistry;
     this.roundRobinSchedulingPolicy = roundRobinSchedulingPolicy;
-    this.scheduleTimeoutMs = roundRobinSchedulingPolicy.getScheduleTimeoutMs();
-  }
-
-  @Override
-  public long getScheduleTimeoutMs() {
-    return scheduleTimeoutMs;
   }
 
   /**
@@ -82,40 +69,20 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
   @Override
   public boolean scheduleTaskGroup(final ScheduledTaskGroup scheduledTaskGroup,
                                    final JobStateManager jobStateManager) {
-    lock.lock();
+    Set<String> sourceLocations = Collections.emptySet();
     try {
-      Set<String> sourceLocations = Collections.emptySet();
-      try {
-        sourceLocations = getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
-      } catch (final UnsupportedOperationException e) {
-        // do nothing
-      } catch (final Exception e) {
-        LOG.warn(String.format("Exception while trying to get source location for %s",
-            scheduledTaskGroup.getTaskGroupId()), e);
-      }
-      if (sourceLocations.size() == 0) {
-        // No source location information found, fall back to the RoundRobinSchedulingPolicy
-        return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, jobStateManager);
-      }
-
-      long timeoutInNanoseconds = scheduleTimeoutMs * 1000000;
-      while (timeoutInNanoseconds > 0) {
-        if (scheduleToLocalNode(scheduledTaskGroup, jobStateManager, sourceLocations)) {
-          return true;
-        }
-        try {
-          timeoutInNanoseconds = moreExecutorsAvailableCondition.awaitNanos(timeoutInNanoseconds);
-          // Signals on this condition does not necessarily guarantee that the added executor helps scheduling the
-          // TaskGroup we are interested in. We need to await again if the consequent scheduling attempt still fails,
-          // until we spend the time budget specified.
-        } catch (final InterruptedException e) {
-          throw new SchedulingException(e);
-        }
-      }
-      return false;
-    } finally {
-      lock.unlock();
+      sourceLocations = getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+    } catch (final UnsupportedOperationException e) {
+      // do nothing
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
+    if (sourceLocations.size() == 0) {
+      // No source location information found, fall back to the RoundRobinSchedulingPolicy
+      return roundRobinSchedulingPolicy.scheduleTaskGroup(scheduledTaskGroup, jobStateManager);
     }
+
+    return scheduleToLocalNode(scheduledTaskGroup, jobStateManager, sourceLocations);
   }
 
   /**
@@ -128,78 +95,45 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
   private boolean scheduleToLocalNode(final ScheduledTaskGroup scheduledTaskGroup,
                                       final JobStateManager jobStateManager,
                                       final Set<String> sourceLocations) {
-    lock.lock();
-    try {
-      final List<ExecutorRepresenter> candidateExecutors =
-          selectExecutorByContainerTypeAndNodeNames(scheduledTaskGroup.getContainerType(), sourceLocations);
-      if (candidateExecutors.size() == 0) {
-        return false;
-      }
-      final int randomIndex = ThreadLocalRandom.current().nextInt(0, candidateExecutors.size());
-      final ExecutorRepresenter selectedExecutor = candidateExecutors.get(randomIndex);
-
-      jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), TaskGroupState.State.EXECUTING);
-      selectedExecutor.onTaskGroupScheduled(scheduledTaskGroup);
-      LOG.info("Scheduling {} (source location: {}) to {} (node name: {})", scheduledTaskGroup.getTaskGroupId(),
-          String.join(", ", sourceLocations), selectedExecutor.getExecutorId(),
-          selectedExecutor.getNodeName());
-      return true;
-    } finally {
-      lock.unlock();
+    final List<ExecutorRepresenter> candidateExecutors =
+        selectExecutorByContainerTypeAndNodeNames(scheduledTaskGroup.getContainerType(), sourceLocations);
+    if (candidateExecutors.size() == 0) {
+      return false;
     }
+    final int randomIndex = ThreadLocalRandom.current().nextInt(0, candidateExecutors.size());
+    final ExecutorRepresenter selectedExecutor = candidateExecutors.get(randomIndex);
+
+    jobStateManager.onTaskGroupStateChanged(scheduledTaskGroup.getTaskGroupId(), TaskGroupState.State.EXECUTING);
+    selectedExecutor.onTaskGroupScheduled(scheduledTaskGroup);
+    LOG.info("Scheduling {} (source location: {}) to {} (node name: {})", scheduledTaskGroup.getTaskGroupId(),
+        String.join(", ", sourceLocations), selectedExecutor.getExecutorId(),
+        selectedExecutor.getNodeName());
+    return true;
   }
 
   @Override
   public void onExecutorAdded(final ExecutorRepresenter executorRepresenter) {
-    lock.lock();
-    try {
-      moreExecutorsAvailableCondition.signal();
-      roundRobinSchedulingPolicy.onExecutorAdded(executorRepresenter);
-    } finally {
-      lock.unlock();
-    }
+    roundRobinSchedulingPolicy.onExecutorAdded(executorRepresenter);
   }
 
   @Override
   public Set<String> onExecutorRemoved(final String executorId) {
-    lock.lock();
-    try {
-      return roundRobinSchedulingPolicy.onExecutorRemoved(executorId);
-    } finally {
-      lock.unlock();
-    }
+    return roundRobinSchedulingPolicy.onExecutorRemoved(executorId);
   }
 
   @Override
   public void onTaskGroupExecutionComplete(final String executorId, final String taskGroupId) {
-    lock.lock();
-    try {
-      moreExecutorsAvailableCondition.signal();
-      roundRobinSchedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
-    } finally {
-      lock.unlock();
-    }
+    roundRobinSchedulingPolicy.onTaskGroupExecutionComplete(executorId, taskGroupId);
   }
 
   @Override
   public void onTaskGroupExecutionFailed(final String executorId, final String taskGroupId) {
-    lock.lock();
-    try {
-      moreExecutorsAvailableCondition.signal();
-      roundRobinSchedulingPolicy.onTaskGroupExecutionFailed(executorId, taskGroupId);
-    } finally {
-      lock.unlock();
-    }
+    roundRobinSchedulingPolicy.onTaskGroupExecutionFailed(executorId, taskGroupId);
   }
 
   @Override
   public void terminate() {
-    lock.lock();
-    try {
-      roundRobinSchedulingPolicy.terminate();
-    } finally {
-      lock.unlock();
-    }
+    roundRobinSchedulingPolicy.terminate();
   }
 
   /**
@@ -209,21 +143,16 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
    *         and has an empty slot for execution
    */
   private List<ExecutorRepresenter> selectExecutorByContainerTypeAndNodeNames(
-      final String containerType, final Set<String> nodeNames) {
-    lock.lock();
-    try {
-      final Stream<ExecutorRepresenter> localNodesWithSpareCapacity = executorRegistry.getRunningExecutorIds().stream()
-          .map(executorId -> executorRegistry.getRunningExecutorRepresenter(executorId))
-          .filter(executor -> executor.getRunningTaskGroups().size() < executor.getExecutorCapacity())
-          .filter(executor -> nodeNames.contains(executor.getNodeName()));
-      if (containerType.equals(ExecutorPlacementProperty.NONE)) {
-        return localNodesWithSpareCapacity.collect(Collectors.toList());
-      } else {
-        return localNodesWithSpareCapacity.filter(executor -> executor.getContainerType().equals(containerType))
-            .collect(Collectors.toList());
-      }
-    } finally {
-      lock.unlock();
+    final String containerType, final Set<String> nodeNames) {
+    final Stream<ExecutorRepresenter> localNodesWithSpareCapacity = executorRegistry.getRunningExecutorIds().stream()
+        .map(executorId -> executorRegistry.getRunningExecutorRepresenter(executorId))
+        .filter(executor -> executor.getRunningTaskGroups().size() < executor.getExecutorCapacity())
+        .filter(executor -> nodeNames.contains(executor.getNodeName()));
+    if (containerType.equals(ExecutorPlacementProperty.NONE)) {
+      return localNodesWithSpareCapacity.collect(Collectors.toList());
+    } else {
+      return localNodesWithSpareCapacity.filter(executor -> executor.getContainerType().equals(containerType))
+          .collect(Collectors.toList());
     }
   }
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
index b4f411a..f95c452 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
@@ -21,7 +21,7 @@ import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
-import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
+import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupCollection;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
@@ -104,12 +104,13 @@ public final class RuntimeTestUtil {
     sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, newState, attemptIdx, null);
   }
 
-  public static void mockSchedulerRunner(final PendingTaskGroupQueue pendingTaskGroupQueue,
+  public static void mockSchedulerRunner(final PendingTaskGroupCollection pendingTaskGroupCollection,
                                          final SchedulingPolicy schedulingPolicy,
                                          final JobStateManager jobStateManager,
                                          final boolean isPartialSchedule) {
-    while (!pendingTaskGroupQueue.isEmpty()) {
-      final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupQueue.dequeue().get();
+    while (!pendingTaskGroupCollection.isEmpty()) {
+      final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupCollection.remove(
+          pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId());
 
       schedulingPolicy.scheduleTaskGroup(taskGroupToSchedule, jobStateManager);
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index 73a1b5d..80f00d2 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -98,7 +98,6 @@ import static org.mockito.Mockito.mock;
     SourceVertex.class})
 public final class DataTransferTest {
   private static final String EXECUTOR_ID_PREFIX = "Executor";
-  private static final int SCHEDULE_TIMEOUT = 1000;
   private static final DataStoreProperty.Value MEMORY_STORE = DataStoreProperty.Value.MemoryStore;
   private static final DataStoreProperty.Value SER_MEMORY_STORE = DataStoreProperty.Value.SerializedMemoryStore;
   private static final DataStoreProperty.Value LOCAL_FILE_STORE = DataStoreProperty.Value.LocalFileStore;
@@ -134,8 +133,8 @@ public final class DataTransferTest {
     final PubSubEventHandlerWrapper pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
     final SchedulingPolicy schedulingPolicy = new RoundRobinSchedulingPolicy(
-        injector.getInstance(ExecutorRegistry.class), SCHEDULE_TIMEOUT);
-    final PendingTaskGroupQueue taskGroupQueue = new SingleJobTaskGroupQueue();
+        injector.getInstance(ExecutorRegistry.class));
+    final PendingTaskGroupCollection taskGroupQueue = new SingleJobTaskGroupCollection();
     final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskGroupQueue);
     final Scheduler scheduler =
         new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, taskGroupQueue, master,
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
index 9afe504..c87bc81 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java
@@ -81,15 +81,13 @@ public final class BatchSingleJobSchedulerTest {
   private SchedulerRunner schedulerRunner;
   private ExecutorRegistry executorRegistry;
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskGroupQueue pendingTaskGroupQueue;
+  private PendingTaskGroupCollection pendingTaskGroupCollection;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class);
   private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
   private PhysicalPlanGenerator physicalPlanGenerator;
 
-  private static final int TEST_TIMEOUT_MS = 500;
-
   private static final int EXECUTOR_CAPACITY = 20;
 
   // This schedule index will make sure that task group events are not ignored
@@ -103,13 +101,13 @@ public final class BatchSingleJobSchedulerTest {
     irDAGBuilder = initializeDAGBuilder();
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
-    pendingTaskGroupQueue = new SingleJobTaskGroupQueue();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry, TEST_TIMEOUT_MS);
-    schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupQueue);
+    pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
+    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
+    schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupCollection);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskGroupQueue,
+        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskGroupCollection,
             blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
index 9d067b0..8bfd993 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java
@@ -83,7 +83,7 @@ public final class FaultToleranceTest {
   private ExecutorRegistry executorRegistry;
 
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskGroupQueue pendingTaskGroupQueue;
+  private PendingTaskGroupCollection pendingTaskGroupCollection;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class);
@@ -91,7 +91,6 @@ public final class FaultToleranceTest {
   private final ExecutorService serExecutorService = Executors.newSingleThreadExecutor();
   private PhysicalPlanGenerator physicalPlanGenerator;
 
-  private static final int TEST_TIMEOUT_MS = 500;
   private static final int MAX_SCHEDULE_ATTEMPT = 3;
 
   @Before
@@ -111,16 +110,16 @@ public final class FaultToleranceTest {
                               final boolean useMockSchedulerRunner) throws InjectionException {
     executorRegistry = Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
 
-    pendingTaskGroupQueue = new SingleJobTaskGroupQueue();
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry, TEST_TIMEOUT_MS);
+    pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
+    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
 
     if (useMockSchedulerRunner) {
       schedulerRunner = mock(SchedulerRunner.class);
     } else {
-      schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupQueue);
+      schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupCollection);
     }
     scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskGroupQueue,
+        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskGroupCollection,
             blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler);
 
     // Add nodes
@@ -208,14 +207,14 @@ public final class FaultToleranceTest {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
 
         // Due to round robin scheduling, "a2" is assured to have a running TaskGroup.
         scheduler.onExecutorRemoved("a2");
@@ -225,15 +224,15 @@ public final class FaultToleranceTest {
         }
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 2);
 
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else {
         // There are 2 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 2.
         // Schedule only the first TaskGroup
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, true);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, true);
 
         boolean first = true;
         for (final String taskGroupId : stage.getTaskGroupIds()) {
@@ -285,15 +284,15 @@ public final class FaultToleranceTest {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
@@ -304,7 +303,7 @@ public final class FaultToleranceTest {
         }
 
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 3);
-        assertFalse(pendingTaskGroupQueue.isEmpty());
+        assertFalse(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId -> {
           assertEquals(jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(),
               TaskGroupState.State.READY);
@@ -345,14 +344,14 @@ public final class FaultToleranceTest {
       if (stage.getScheduleGroupIndex() == 0) {
 
         // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
-        assertTrue(pendingTaskGroupQueue.isEmpty());
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
+        assertTrue(pendingTaskGroupCollection.isEmpty());
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
               taskGroupId, TaskGroupState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
-        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupQueue, schedulingPolicy, jobStateManager, false);
+        RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, false);
 
         stage.getTaskGroupIds().forEach(taskGroupId ->
           RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
index 72b19b7..22068a5 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -41,10 +41,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.function.Function;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.*;
 
 /**
@@ -53,8 +51,6 @@ import static org.mockito.Mockito.*;
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(JobStateManager.class)
 public final class RoundRobinSchedulingPolicyTest {
-  private static final int TIMEOUT_MS = 2000;
-
   private SchedulingPolicy schedulingPolicy;
   private ExecutorRegistry executorRegistry;
   private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class);
@@ -68,7 +64,7 @@ public final class RoundRobinSchedulingPolicyTest {
   public void setUp() throws InjectionException {
     executorRegistry = Tang.Factory.getTang().newInjector().getInstance(ExecutorRegistry.class);
 
-    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry, TIMEOUT_MS);
+    schedulingPolicy = new RoundRobinSchedulingPolicy(executorRegistry);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
@@ -106,27 +102,6 @@ public final class RoundRobinSchedulingPolicyTest {
   }
 
   @Test
-  public void testWakeupFromAwaitByTaskGroupCompletion() {
-    final Timer timer = new Timer();
-    final List<ScheduledTaskGroup> scheduledTaskGroups =
-        convertToScheduledTaskGroups(5, new byte[0], "Stage", ExecutorPlacementProperty.RESERVED);
-    assertTrue(schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(0), jobStateManager));
-    timer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        schedulingPolicy.onTaskGroupExecutionComplete(RESERVED_EXECUTOR_ID,
-            scheduledTaskGroups.get(0).getTaskGroupId());
-      }
-    }, 1000);
-    assertTrue(schedulingPolicy.scheduleTaskGroup(scheduledTaskGroups.get(1), jobStateManager));
-  }
-
-  @Test
-  public void checkScheduleTimeout() {
-    assertEquals(schedulingPolicy.getScheduleTimeoutMs(), TIMEOUT_MS);
-  }
-
-  @Test
   public void testNoneContainerType() {
     final int slots = 6;
     final List<ScheduledTaskGroup> scheduledTaskGroups =
@@ -143,67 +118,6 @@ public final class RoundRobinSchedulingPolicyTest {
     assertFalse(isScheduled);
   }
 
-  @Test
-  public void testSingleCoreTwoTypesOfExecutors() {
-    final List<ScheduledTaskGroup> scheduledTaskGroupsA =
-        convertToScheduledTaskGroups(5, new byte[0], "Stage A", ExecutorPlacementProperty.COMPUTE);
-    final List<ScheduledTaskGroup> scheduledTaskGroupsB =
-        convertToScheduledTaskGroups(3, new byte[0], "Stage B", ExecutorPlacementProperty.TRANSIENT);
-
-
-    boolean a0 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(0), jobStateManager);
-    assertTrue(a0);
-
-    boolean a1 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(1), jobStateManager);
-    assertTrue(a1);
-
-    boolean a2 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(2), jobStateManager);
-    assertTrue(a2);
-
-    boolean a3 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(3), jobStateManager);
-    // After 2000 ms
-    assertFalse(a3);
-
-    schedulingPolicy.onTaskGroupExecutionComplete("a1", scheduledTaskGroupsA.get(0).getTaskGroupId());
-
-    a3 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(3), jobStateManager);
-    assertTrue(a3);
-
-    boolean a4 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(4), jobStateManager);
-    // After 2000 ms
-    assertFalse(a4);
-
-    schedulingPolicy.onTaskGroupExecutionComplete("a3", scheduledTaskGroupsA.get(2).getTaskGroupId());
-
-    a4 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsA.get(4), jobStateManager);
-    assertTrue(a4);
-
-    boolean b0 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsB.get(0), jobStateManager);
-    assertTrue(b0);
-
-    boolean b1 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsB.get(1), jobStateManager);
-    assertTrue(b1);
-
-    boolean b2 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsB.get(2), jobStateManager);
-    // After 2000 ms
-    assertFalse(b2);
-
-    schedulingPolicy.onTaskGroupExecutionComplete("b1", scheduledTaskGroupsB.get(0).getTaskGroupId());
-
-    b2 = schedulingPolicy.scheduleTaskGroup(scheduledTaskGroupsB.get(2), jobStateManager);
-    assertTrue(b2);
-
-    Set<String> executingTaskGroups = schedulingPolicy.onExecutorRemoved("b1");
-    assertEquals(1, executingTaskGroups.size());
-    assertEquals(scheduledTaskGroupsB.get(2).getTaskGroupId(), executingTaskGroups.iterator().next());
-
-    executingTaskGroups = schedulingPolicy.onExecutorRemoved("a1");
-    assertEquals(1, executingTaskGroups.size());
-    assertEquals(scheduledTaskGroupsA.get(3).getTaskGroupId(), executingTaskGroups.iterator().next());
-
-    verify(mockMsgSender, times(8)).send(anyObject());
-  }
-
   /**
    * Wrap a DAG of a task group into {@link ScheduledTaskGroup}s.
    *
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
index d48192f..41bdb4d 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java
@@ -29,13 +29,14 @@ import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.master.scheduler.SingleJobTaskGroupQueue;
+import edu.snu.nemo.runtime.master.scheduler.SingleJobTaskGroupCollection;
 import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy;
 import org.apache.reef.tang.Injector;
 import org.apache.reef.tang.Tang;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -48,11 +49,11 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 /**
- * Tests {@link SingleJobTaskGroupQueue}.
+ * Tests {@link SingleJobTaskGroupCollection}.
  */
 public final class SingleTaskGroupQueueTest {
   private DAGBuilder<IRVertex, IREdge> irDAGBuilder;
-  private SingleJobTaskGroupQueue pendingTaskGroupPriorityQueue;
+  private SingleJobTaskGroupCollection pendingTaskGroupPriorityQueue;
   private PhysicalPlanGenerator physicalPlanGenerator;
 
   /**
@@ -63,7 +64,7 @@ public final class SingleTaskGroupQueueTest {
   @Before
   public void setUp() throws Exception{
     irDAGBuilder = new DAGBuilder<>();
-    pendingTaskGroupPriorityQueue = new SingleJobTaskGroupQueue();
+    pendingTaskGroupPriorityQueue = new SingleJobTaskGroupCollection();
     executorService = Executors.newFixedThreadPool(2);
 
     final Injector injector = Tang.Factory.getTang().newInjector();
@@ -72,7 +73,7 @@ public final class SingleTaskGroupQueueTest {
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupQueue}.
+   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupCollection}.
    * Tests whether the dequeued TaskGroups are according to the stage-dependency priority.
    */
   @Test
@@ -117,7 +118,7 @@ public final class SingleTaskGroupQueueTest {
     final AtomicBoolean passed = new AtomicBoolean(true);
 
     // This mimics Batch Scheduler's behavior
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       // First schedule the children TaskGroups (since it is push).
       // BatchSingleJobScheduler will schedule TaskGroups in this order as well.
       scheduleStage(dagOf2Stages.get(1));
@@ -125,18 +126,18 @@ public final class SingleTaskGroupQueueTest {
       scheduleStage(dagOf2Stages.get(0));
 
       countDownLatch.countDown();
-    });
+    }).get();
 
     // This mimics SchedulerRunner's behavior
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final ScheduledTaskGroup dequeuedTaskGroup = pendingTaskGroupPriorityQueue.dequeue().get();
+        final ScheduledTaskGroup dequeuedTaskGroup = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskGroupId(dequeuedTaskGroup.getTaskGroupId()),
             dagOf2Stages.get(1).getId());
 
-        // Let's say we fail to schedule, and enqueue this TaskGroup back.
-        pendingTaskGroupPriorityQueue.enqueue(dequeuedTaskGroup);
+        // Let's say we fail to schedule, and add this TaskGroup back.
+        pendingTaskGroupPriorityQueue.add(dequeuedTaskGroup);
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
 
         // Now that we've dequeued all of the children TaskGroups, we should now start getting the parents.
@@ -148,14 +149,14 @@ public final class SingleTaskGroupQueueTest {
       } finally {
         countDownLatch.countDown();
       }
-    });
+    }).get();
 
     countDownLatch.await();
     assertTrue(passed.get());
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupQueue}.
+   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupCollection}.
    * Tests whether the dequeued TaskGroups are according to the stage-dependency priority.
    */
   @Test
@@ -199,23 +200,23 @@ public final class SingleTaskGroupQueueTest {
     final CountDownLatch countDownLatch = new CountDownLatch(2);
 
     // This mimics Batch Scheduler's behavior
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       // First schedule the parent TaskGroups (since it is pull).
       // BatchSingleJobScheduler will schedule TaskGroups in this order as well.
       scheduleStage(dagOf2Stages.get(0));
       countDownLatch.countDown();
-    });
+    }).get();
 
     // This mimics SchedulerRunner's behavior
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        final ScheduledTaskGroup dequeuedTaskGroup = pendingTaskGroupPriorityQueue.dequeue().get();
+        final ScheduledTaskGroup dequeuedTaskGroup = dequeue();
         assertEquals(RuntimeIdGenerator.getStageIdFromTaskGroupId(dequeuedTaskGroup.getTaskGroupId()),
             dagOf2Stages.get(0).getId());
 
-        // Let's say we fail to schedule, and enqueue this TaskGroup back.
-        pendingTaskGroupPriorityQueue.enqueue(dequeuedTaskGroup);
+        // Let's say we fail to schedule, and add this TaskGroup back.
+        pendingTaskGroupPriorityQueue.add(dequeuedTaskGroup);
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
 
         // Now that we've dequeued all of the children TaskGroups, we should now schedule children.
@@ -228,102 +229,18 @@ public final class SingleTaskGroupQueueTest {
       } finally {
         countDownLatch.countDown();
       }
-    });
+    }).get();
 
     countDownLatch.await();
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupQueue}.
-   * Tests whether the dequeued TaskGroups are according to the stage-dependency priority.
-   */
-  @Test
-  public void testPushRemoveAndAddStageDependency() throws Exception {
-    final Transform t = mock(Transform.class);
-    final IRVertex v1 = new OperatorVertex(t);
-    v1.setProperty(ParallelismProperty.of(3));
-    v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v1);
-
-    final IRVertex v2 = new OperatorVertex(t);
-    v2.setProperty(ParallelismProperty.of(2));
-    v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v2);
-
-    final IRVertex v3 = new OperatorVertex(t);
-    v3.setProperty(ParallelismProperty.of(2));
-    v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE));
-    irDAGBuilder.addVertex(v3);
-
-    final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e1);
-
-    final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER);
-    irDAGBuilder.connectVertices(e2);
-
-    final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(),
-        new TestPolicy(true), "");
-
-    final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator);
-
-    pendingTaskGroupPriorityQueue.onJobScheduled(
-        new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()));
-
-    final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort();
-
-    // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
-    assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex());
-
-    final CountDownLatch countDownLatch = new CountDownLatch(2);
-
-    // This mimics SchedulerRunner's behavior, but let's schedule this thread first this time,
-    // as opposed to testPushPriority.
-    executorService.execute(() -> {
-      try {
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final ScheduledTaskGroup dequeuedTaskGroup = pendingTaskGroupPriorityQueue.dequeue().get();
-        assertEquals(RuntimeIdGenerator.getStageIdFromTaskGroupId(dequeuedTaskGroup.getTaskGroupId()),
-            dagOf2Stages.get(1).getId());
-
-        // SchedulerRunner will never dequeue another TaskGroup before enquing back the failed TaskGroup,
-        // but just for testing purposes of PendingTGPQ...
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
-        // Let's say we fail to schedule, and enqueue this TaskGroup back.
-        pendingTaskGroupPriorityQueue.enqueue(dequeuedTaskGroup);
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-
-        // Now that we've dequeued all of the children TaskGroups, we should now start getting the parents.
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-      } catch (Exception e) {
-        e.printStackTrace();
-      } finally {
-        countDownLatch.countDown();
-      }
-    });
-
-    // This mimics Batch Scheduler's behavior
-    executorService.execute(() -> {
-      // First schedule the children TaskGroups (since it is push).
-      // BatchSingleJobScheduler will schedule TaskGroups in this order as well.
-      scheduleStage(dagOf2Stages.get(1));
-      // Then, schedule the parent TaskGroups.
-      scheduleStage(dagOf2Stages.get(0));
-
-      countDownLatch.countDown();
-    });
-
-    countDownLatch.await();
-  }
-
-  /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupQueue}.
+   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupCollection}.
    * Tests whether the dequeued TaskGroups are according to the stage-dependency priority,
    * while concurrently scheduling TaskGroups that have dependencies, but are of different container types.
    */
   @Test
-  public void testContainerTypeAwareness() throws Exception {
+  public void testWithDifferentContainerType() throws Exception {
     final Transform t = mock(Transform.class);
     final IRVertex v1 = new OperatorVertex(t);
     v1.setProperty(ParallelismProperty.of(3));
@@ -370,24 +287,23 @@ public final class SingleTaskGroupQueueTest {
     countDownLatch.countDown();
 
     // This mimics SchedulerRunner's behavior.
-    executorService.execute(() -> {
+    executorService.submit(() -> {
       try {
-        // Since Stage-0 and Stage-1 have different container types, they should simply alternate turns in scheduling.
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
 
-        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
 
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
 
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
+
+        assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
       } catch (Exception e) {
         e.printStackTrace();
       } finally {
         countDownLatch.countDown();
       }
-    });
+    }).get();
 
     countDownLatch.await();
   }
@@ -398,7 +314,7 @@ public final class SingleTaskGroupQueueTest {
    */
   private void scheduleStage(final PhysicalStage stage) {
     stage.getTaskGroupIds().forEach(taskGroupId ->
-        pendingTaskGroupPriorityQueue.enqueue(new ScheduledTaskGroup(
+        pendingTaskGroupPriorityQueue.add(new ScheduledTaskGroup(
             "TestPlan", stage.getSerializedTaskGroupDag(), taskGroupId, Collections.emptyList(),
             Collections.emptyList(), 0, stage.getContainerType(), Collections.emptyMap())));
   }
@@ -408,7 +324,17 @@ public final class SingleTaskGroupQueueTest {
    * @return the stage name of the dequeued task group.
    */
   private String dequeueAndGetStageId() {
-    final ScheduledTaskGroup scheduledTaskGroup = pendingTaskGroupPriorityQueue.dequeue().get();
+    final ScheduledTaskGroup scheduledTaskGroup = dequeue();
     return RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId());
   }
+
+  /**
+   * Dequeues a scheduled task group from the task group priority queue.
+   * @return the TaskGroup dequeued
+   */
+  private ScheduledTaskGroup dequeue() {
+    final Collection<ScheduledTaskGroup> scheduledTaskGroups
+        = pendingTaskGroupPriorityQueue.peekSchedulableTaskGroups().get();
+    return pendingTaskGroupPriorityQueue.remove(scheduledTaskGroups.iterator().next().getTaskGroupId());
+  }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index baf18cf..143ecb4 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -61,13 +61,14 @@ public final class SourceLocationAwareSchedulingPolicyTest {
   private SpiedSchedulingPolicyWrapper<RoundRobinSchedulingPolicy> roundRobin;
   private MockJobStateManagerWrapper jobStateManager;
 
-  private void setup(final int schedulerTimeoutMs) {
+  @Before
+  public void setup() {
     final Injector injector = Tang.Factory.getTang().newInjector();
     jobStateManager = new MockJobStateManagerWrapper();
 
     final ExecutorRegistry executorRegistry = new ExecutorRegistry();
     final RoundRobinSchedulingPolicy roundRobinSchedulingPolicy =
-        new RoundRobinSchedulingPolicy(executorRegistry, schedulerTimeoutMs);
+        new RoundRobinSchedulingPolicy(executorRegistry);
     roundRobin = new SpiedSchedulingPolicyWrapper(roundRobinSchedulingPolicy, jobStateManager.get());
 
     injector.bindVolatileInstance(RoundRobinSchedulingPolicy.class, roundRobin.get());
@@ -92,8 +93,6 @@ public final class SourceLocationAwareSchedulingPolicyTest {
    */
   @Test
   public void testRoundRobinSchedulerFallback() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withoutReadables(ExecutorPlacementProperty.NONE);
     final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithoutSourceLocations(2,
@@ -131,8 +130,6 @@ public final class SourceLocationAwareSchedulingPolicyTest {
    */
   @Test
   public void testSourceLocationAwareSchedulingNotAvailable() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)), ExecutorPlacementProperty.NONE);
@@ -156,8 +153,6 @@ public final class SourceLocationAwareSchedulingPolicyTest {
    */
   @Test
   public void testSourceLocationAwareSchedulingWithContainerType() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
@@ -190,8 +185,6 @@ public final class SourceLocationAwareSchedulingPolicyTest {
    */
   @Test
   public void testSourceLocationAwareSchedulingDoesNotOverSchedule() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)), CONTAINER_TYPE_A);
@@ -221,8 +214,6 @@ public final class SourceLocationAwareSchedulingPolicyTest {
    */
   @Test
   public void testSourceLocationAwareSchedulingWithMultiSource() {
-    setup(500);
-
     // Prepare test scenario
     final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)), CONTAINER_TYPE_A);
@@ -247,62 +238,6 @@ public final class SourceLocationAwareSchedulingPolicyTest {
     e.assertScheduledTaskGroups(Arrays.asList(tg0, tg1, tg2, tg3));
   }
 
-  /**
-   * If there are no appropriate executors available, {@link SourceLocationAwareSchedulingPolicy} should await
-   * for the given amount of time, immediately waking up on executor addition.
-   */
-  @Test
-  public void testWakeupFromAwaitByExecutorAddition() {
-    // We need timeout value which is long enough.
-    setup(20000);
-    final Timer timer = new Timer();
-
-    // Prepare test scenario
-    final ScheduledTaskGroup tg = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_1)), CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e = new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 1);
-
-    // The executor will be available in 1000ms.
-    timer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        addExecutor(e);
-      }
-    }, 1000);
-    // Attempt to schedule TG must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg, jobStateManager.get()));
-  }
-
-  /**
-   * If there are no appropriate executors available, {@link SourceLocationAwareSchedulingPolicy} should await
-   * for the given amount of time, immediately waking up on TaskGroup completion.
-   */
-  @Test
-  public void testWakeupFromAwaitByTaskGroupCompletion() {
-    // We need timeout value which is long enough.
-    setup(20000);
-    final Timer timer = new Timer();
-
-    // Prepare test scenario
-    final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_1)), CONTAINER_TYPE_A);
-    final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
-        Collections.singletonList(Collections.singletonList(SITE_1)), CONTAINER_TYPE_A);
-    final MockExecutorRepresenterWrapper e = addExecutor(new MockExecutorRepresenterWrapper(SITE_1, CONTAINER_TYPE_A, 1));
-
-    // Attempt to schedule TG must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg0, jobStateManager.get()));
-    // The TaskGroup will be completed in 1000ms.
-    timer.schedule(new TimerTask() {
-      @Override
-      public void run() {
-        sourceLocationAware.onTaskGroupExecutionComplete(e.get().getExecutorId(), tg0.getTaskGroupId());
-      }
-    }, 1000);
-    // Attempt to schedule TG must success
-    assertTrue(sourceLocationAware.scheduleTaskGroup(tg1, jobStateManager.get()));
-  }
-
   private MockExecutorRepresenterWrapper addExecutor(final MockExecutorRepresenterWrapper executor) {
     sourceLocationAware.onExecutorAdded(executor.get());
     return executor;
@@ -362,7 +297,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
           final Readable readable = mock(Readable.class);
-          when(readable.getLocations()).thenThrow(new Exception("EXCEPTION"));
+          when(readable.getLocations()).thenThrow(new UnsupportedOperationException());
           readables.add(readable);
         }
         return doCreate(readables, containerType);

-- 
To stop receiving notification emails like this one, please contact
sanha@apache.org.

Mime
View raw message