nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jeongy...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-76] Rename TaskGroup to Task (#20)
Date Mon, 21 May 2018 06:28:17 GMT
This is an automated email from the ASF dual-hosted git repository.

jeongyoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 2958d84  [NEMO-76] Rename TaskGroup to Task (#20)
2958d84 is described below

commit 2958d8438ef6d9726868cdebdba79d6f29e70b5c
Author: John Yang <johnyangk@gmail.com>
AuthorDate: Mon May 21 15:28:15 2018 +0900

    [NEMO-76] Rename TaskGroup to Task (#20)
    
    JIRA: NEMO-76: Rename TaskGroup to Task
    
    Major changes:
    
    Appropriately renames things like 'TaskGroup', 'taskGroup', 'Task group', and 'task group'
    Minor changes to note:
    
    Removes the legacy TaskState
    Tests for the changes:
    
    This PR only changes names, and removes TaskState that isn't used anywhere
    The existing tests continue to pass
    Other comments:
    
    The legacy 'Task' will be handled with https://issues.apache.org/jira/browse/NEMO-79
    Once this PR is merged, I'll work on that ASAP to remove the confusion between the two 'Task's.
    
    resolves NEMO-76
---
 README.md                                          |   2 +-
 bin/json2dot.py                                    |  36 +--
 .../nemo/common/exception/SchedulingException.java |   2 +-
 conf/src/main/java/edu/snu/nemo/conf/JobConf.java  |  10 +-
 .../nemo/runtime/common/RuntimeIdGenerator.java    |  36 +--
 .../eventhandler/DynamicOptimizationEvent.java     |   2 +-
 .../eventhandler/UpdatePhysicalPlanEvent.java      |   4 +-
 .../common/message/ncs/NcsMessageEnvironment.java  |   4 +-
 .../snu/nemo/runtime/common/metric/MetricData.java |   2 +-
 .../pass/runtime/DataSkewRuntimePass.java          |  30 +--
 .../plan/physical/PhysicalPlanGenerator.java       |   8 +-
 .../common/plan/physical/PhysicalStage.java        |  44 ++--
 .../common/plan/physical/PhysicalStageEdge.java    |  24 +-
 .../common/plan/physical/ScheduledTask.java        | 139 ++++++++++
 .../common/plan/physical/ScheduledTaskGroup.java   | 139 ----------
 .../nemo/runtime/common/plan/physical/Task.java    |   2 +-
 .../snu/nemo/runtime/common/state/BlockState.java  |   4 +-
 .../snu/nemo/runtime/common/state/JobState.java    |   4 +-
 .../snu/nemo/runtime/common/state/StageState.java  |  14 +-
 .../nemo/runtime/common/state/TaskGroupState.java  | 105 --------
 .../snu/nemo/runtime/common/state/TaskState.java   |  53 ++--
 runtime/common/src/main/proto/ControlMessage.proto |  20 +-
 .../edu/snu/nemo/runtime/executor/Executor.java    |  49 ++--
 .../{TaskGroupExecutor.java => TaskExecutor.java}  | 126 +++++-----
 ...roupStateManager.java => TaskStateManager.java} |  96 +++----
 .../runtime/executor/datatransfer/InputReader.java |   2 +-
 .../executor/datatransfer/OutputCollectorImpl.java |   2 +-
 .../executor/datatransfer/TaskDataHandler.java     |   8 +-
 .../nemo/runtime/master/BlockManagerMaster.java    |  70 +++---
 .../snu/nemo/runtime/master/JobStateManager.java   | 144 ++++++-----
 .../nemo/runtime/master/MetricMessageHandler.java  |   2 +-
 .../edu/snu/nemo/runtime/master/RuntimeMaster.java |  94 +++----
 .../master/resource/ExecutorRepresenter.java       |  86 +++----
 .../master/resource/ResourceSpecification.java     |   4 +-
 .../master/scheduler/BatchSingleJobScheduler.java  | 279 +++++++++++----------
 .../scheduler/CompositeSchedulingPolicy.java       |   6 +-
 .../ContainerTypeAwareSchedulingPolicy.java        |  12 +-
 .../master/scheduler/FreeSlotSchedulingPolicy.java |  10 +-
 ...pCollection.java => PendingTaskCollection.java} |  46 ++--
 .../scheduler/RoundRobinSchedulingPolicy.java      |  12 +-
 .../nemo/runtime/master/scheduler/Scheduler.java   |  32 +--
 .../runtime/master/scheduler/SchedulerRunner.java  |  60 ++---
 .../runtime/master/scheduler/SchedulingPolicy.java |   4 +-
 ...ollection.java => SingleJobTaskCollection.java} | 104 ++++----
 .../SourceLocationAwareSchedulingPolicy.java       |  12 +-
 .../scheduler/BatchSingleJobSchedulerTest.java     |  14 +-
 .../ContainerTypeAwareSchedulingPolicyTest.java    |  16 +-
 .../scheduler/FaultToleranceTest.java              | 128 +++++-----
 .../scheduler/FreeSlotSchedulingPolicyTest.java    |  16 +-
 .../scheduler/RoundRobinSchedulingPolicyTest.java  |  16 +-
 .../scheduler/SchedulerTestUtil.java               |  86 +++----
 ...roupQueueTest.java => SingleTaskQueueTest.java} |  96 +++----
 .../SourceLocationAwareSchedulingPolicyTest.java   |  44 ++--
 .../compiler/backend/nemo/NemoBackendTest.java     |   4 +-
 .../pass/runtime/DataSkewRuntimePassTest.java      |   4 +-
 .../runtime/common/plan/DAGConverterTest.java      |  12 +-
 ...roupExecutorTest.java => TaskExecutorTest.java} |  52 ++--
 .../executor/datatransfer/DataTransferTest.java    |  26 +-
 .../runtime/master/BlockManagerMasterTest.java     |  22 +-
 .../tests/runtime/master/JobStateManagerTest.java  |  20 +-
 60 files changed, 1194 insertions(+), 1306 deletions(-)

diff --git a/README.md b/README.md
index 496cc39..b50b917 100644
--- a/README.md
+++ b/README.md
@@ -113,7 +113,7 @@ Please refer to the [Contribution guideline](.github/CONTRIBUTING.md) to contrib
   * `Reserved` : Containers that store eviction-free resources. `Reserved` containers are used to reliably store intermediate data which have high eviction cost.
   * `Compute` : Containers that are mainly used for computation.
 * `memory_mb`: Memory size in MB
-* `capacity`: Number of `TaskGroup`s that can be run in an executor. Set this value to be the same as the number of CPU cores of the container.
+* `capacity`: Number of `Task`s that can be run in an executor. Set this value to be the same as the number of CPU cores of the container.
 
 ### Examples
 ```json
diff --git a/bin/json2dot.py b/bin/json2dot.py
index 9fac96d..12f0d48 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -63,27 +63,27 @@ class PhysicalStageState:
     def __init__(self, data):
         self.id = data['id']
         self.state = data['state']
-        self.taskGroups = {}
-        for taskGroup in data['taskGroups']:
-            self.taskGroups[taskGroup['id']] = TaskGroupState(taskGroup)
+        self.tasks = {}
+        for task in data['tasks']:
+            self.tasks[task['id']] = TaskState(task)
     @classmethod
     def empty(cls):
-        return cls({'id': None, 'state': None, 'taskGroups': []})
+        return cls({'id': None, 'state': None, 'tasks': []})
     def get(self, id):
         try:
-            return self.taskGroups[id]
+            return self.tasks[id]
         except:
-            return TaskGroupState.empty()
+            return TaskState.empty()
     @property
-    def taskGroupStateSummary(self):
-        stateToNumTaskGroups = dict()
-        for taskGroupState in self.taskGroups.values():
-            before = stateToNumTaskGroups.get(taskGroupState.state, 0)
-            stateToNumTaskGroups[taskGroupState.state] = before + 1
-        return '\\n'.join(['{}: {}'.format(state, stateToNumTaskGroups[state])
-            for state in stateToNumTaskGroups.keys()])
+    def taskStateSummary(self):
+        stateToNumTasks = dict()
+        for taskState in self.tasks.values():
+            before = stateToNumTasks.get(taskState.state, 0)
+            stateToNumTasks[taskState.state] = before + 1
+        return '\\n'.join(['{}: {}'.format(state, stateToNumTasks[state])
+            for state in stateToNumTasks.keys()])
 
-class TaskGroupState:
+class TaskState:
     def __init__(self, data):
         self.id = data['id']
         self.state = data['state']
@@ -252,7 +252,7 @@ class LoopVertex:
 class PhysicalStage:
     def __init__(self, id, properties, state):
         self.id = id
-        self.taskGroup = DAG(properties['taskGroupDag'], JobState.empty())
+        self.task = DAG(properties['taskDag'], JobState.empty())
         self.idx = getIdx()
         self.state = state
     @property
@@ -262,14 +262,14 @@ class PhysicalStage:
         else:
             state = ' ({})'.format(self.state.state)
         dot = 'subgraph cluster_{} {{'.format(self.idx)
-        dot += 'label = "{}{}\\n\\n{} TaskGroup(s):\\n{}";'.format(self.id, state, len(self.state.taskGroups), self.state.taskGroupStateSummary)
+        dot += 'label = "{}{}\\n\\n{} Task(s):\\n{}";'.format(self.id, state, len(self.state.tasks), self.state.taskStateSummary)
         dot += 'color=red; bgcolor="{}";'.format(stateToColor(self.state.state))
-        dot += self.taskGroup.dot
+        dot += self.task.dot
         dot += '}'
         return dot
     @property
     def oneVertex(self):
-        return next(iter(self.taskGroup.vertices.values())).oneVertex
+        return next(iter(self.task.vertices.values())).oneVertex
     @property
     def logicalEnd(self):
         return 'cluster_{}'.format(self.idx)
diff --git a/common/src/main/java/edu/snu/nemo/common/exception/SchedulingException.java b/common/src/main/java/edu/snu/nemo/common/exception/SchedulingException.java
index 856dc36..3b7322e 100644
--- a/common/src/main/java/edu/snu/nemo/common/exception/SchedulingException.java
+++ b/common/src/main/java/edu/snu/nemo/common/exception/SchedulingException.java
@@ -18,7 +18,7 @@ package edu.snu.nemo.common.exception;
 /**
  * SchedulingException.
  * Thrown when any exception occurs while trying to schedule
- * a {edu.snu.nemo.runtime.common.plan.physical.TaskGroup} to an executor.
+ * a {edu.snu.nemo.runtime.common.plan.physical.Task} to an executor.
  */
 public final class SchedulingException extends RuntimeException {
   /**
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 9bfc870..7f052e3 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -124,10 +124,10 @@ public final class JobConf extends ConfigurationModuleBuilder {
 
   /**
    * Executor capacity.
-   * Determines the number of TaskGroup 'slots' for each executor.
-   * 1) Master's TaskGroup scheduler can use this number in scheduling.
-   *    (e.g., schedule TaskGroup to the executor currently with the maximum number of available slots)
-   * 2) Executor's number of TaskGroup execution threads is set to this number.
+   * Determines the number of Task 'slots' for each executor.
+   * 1) Master's Task scheduler can use this number in scheduling.
+   *    (e.g., schedule Task to the executor currently with the maximum number of available slots)
+   * 2) Executor's number of Task execution threads is set to this number.
    */
   @NamedParameter(doc = "Executor capacity", short_name = "executor_capacity", default_value = "1")
   public final class ExecutorCapacity implements Name<Integer> {
@@ -142,7 +142,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
-   * Max number of attempts for task group scheduling.
+   * Max number of attempts for task scheduling.
    */
   @NamedParameter(doc = "Max number of schedules", short_name = "max_schedule_attempt", default_value = "3")
   public final class MaxScheduleAttempt implements Name<Integer> {
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index 90ff6e8..dd19b27 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -28,7 +28,7 @@ public final class RuntimeIdGenerator {
   private static AtomicLong resourceSpecIdGenerator = new AtomicLong(0);
   private static final String BLOCK_PREFIX = "Block-";
   private static final String BLOCK_ID_SPLITTER = "_";
-  private static final String TASK_GROUP_INFIX = "-TaskGroup-";
+  private static final String TASK_INFIX = "-Task-";
   private static final String PHYSICAL_TASK_ID_SPLITTER = "_";
 
   /**
@@ -98,15 +98,15 @@ public final class RuntimeIdGenerator {
   }
 
   /**
-   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup}.
+   * Generates the ID for {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask}.
    *
-   * @param index   the index of this task group.
+   * @param index   the index of this task.
    * @param stageId the ID of the stage.
    * @return the generated ID
    */
-  public static String generateTaskGroupId(final int index,
+  public static String generateTaskId(final int index,
                                            final String stageId) {
-    return stageId + TASK_GROUP_INFIX + index;
+    return stageId + TASK_INFIX + index;
   }
 
   /**
@@ -181,34 +181,34 @@ public final class RuntimeIdGenerator {
   }
 
   /**
-   * Extracts stage ID from a task group ID.
+   * Extracts stage ID from a task ID.
    *
-   * @param taskGroupId the task group ID to extract.
+   * @param taskId the task ID to extract.
    * @return the stage ID.
    */
-  public static String getStageIdFromTaskGroupId(final String taskGroupId) {
-    return parseTaskGroupId(taskGroupId)[0];
+  public static String getStageIdFromTaskId(final String taskId) {
+    return parseTaskId(taskId)[0];
   }
 
   /**
-   * Extracts task group index from a task group ID.
+   * Extracts task index from a task ID.
    *
-   * @param taskGroupId the task group ID to extract.
+   * @param taskId the task ID to extract.
    * @return the index.
    */
-  public static int getIndexFromTaskGroupId(final String taskGroupId) {
-    return Integer.valueOf(parseTaskGroupId(taskGroupId)[1]);
+  public static int getIndexFromTaskId(final String taskId) {
+    return Integer.valueOf(parseTaskId(taskId)[1]);
   }
 
   /**
-   * Parses a task group id.
-   * The result array will contain the stage id and the index of the task group in order.
+   * Parses a task id.
+   * The result array will contain the stage id and the index of the task in order.
    *
-   * @param taskGroupId to parse.
+   * @param taskId to parse.
    * @return the array of parsed information.
    */
-  private static String[] parseTaskGroupId(final String taskGroupId) {
-    return taskGroupId.split(TASK_GROUP_INFIX);
+  private static String[] parseTaskId(final String taskId) {
+    return taskId.split(TASK_INFIX);
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index f6fa6e3..e63f1d0 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -60,7 +60,7 @@ public final class DynamicOptimizationEvent implements RuntimeEvent {
   }
 
   /**
-   * @return the information of the task at which this optimization occurs: its name and its task group ID.
+   * @return the information of the task at which this optimization occurs: its name and its task ID.
    */
   public Pair<String, String> getTaskInfo() {
     return this.taskInfo;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
index 5e424d1..ed576bd 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/UpdatePhysicalPlanEvent.java
@@ -29,7 +29,7 @@ public final class UpdatePhysicalPlanEvent implements CompilerEvent {
   /**
    * Constructor.
    * @param newPhysicalPlan the newly optimized physical plan.
-   * @param taskInfo information of the task at which this optimization occurs: its name and its task group ID.
+   * @param taskInfo information of the task at which this optimization occurs: its name and its task ID.
    */
   UpdatePhysicalPlanEvent(final PhysicalPlan newPhysicalPlan,
                           final Pair<String, String> taskInfo) {
@@ -45,7 +45,7 @@ public final class UpdatePhysicalPlanEvent implements CompilerEvent {
   }
 
   /**
-   * @return the information of the task at which this optimization occurs: its name and its task group ID.
+   * @return the information of the task at which this optimization occurs: its name and its task ID.
    */
   public Pair<String, String> getTaskInfo() {
     return this.taskInfo;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
index 3d37a60..5dfcd6c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/message/ncs/NcsMessageEnvironment.java
@@ -197,8 +197,8 @@ public final class NcsMessageEnvironment implements MessageEnvironment {
 
   private MessageType getMsgType(final ControlMessage.Message controlMessage) {
     switch (controlMessage.getType()) {
-      case TaskGroupStateChanged:
-      case ScheduleTaskGroup:
+      case TaskStateChanged:
+      case ScheduleTask:
       case BlockStateChanged:
       case ExecutorFailed:
       case DataSizeMetric:
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java
index 2cb9e3a..a5d8d3e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java
@@ -25,7 +25,7 @@ import java.util.Map;
  */
 public class MetricData {
   /**
-   * Computation units are: Job, State, TaskGroup and Task.
+   * Computation units are: Job, State, Task.
    */
   private final String computationUnitId;
   private final ObjectMapper objectMapper;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 8b75000..b9d263b 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -73,18 +73,18 @@ public final class DataSkewRuntimePass implements RuntimePass<Map<String, List<P
         .collect(Collectors.toList());
 
     // Get number of evaluators of the next stage (number of blocks).
-    final Integer taskGroupListSize = optimizationEdges.stream().findFirst().orElseThrow(() ->
-        new RuntimeException("optimization edges are empty")).getDst().getTaskGroupIds().size();
+    final Integer taskListSize = optimizationEdges.stream().findFirst().orElseThrow(() ->
+        new RuntimeException("optimization edges are empty")).getDst().getTaskIds().size();
 
     // Calculate keyRanges.
-    final List<KeyRange> keyRanges = calculateHashRanges(metricData, taskGroupListSize);
+    final List<KeyRange> keyRanges = calculateHashRanges(metricData, taskListSize);
 
     // Overwrite the previously assigned hash value range in the physical DAG with the new range.
     optimizationEdges.forEach(optimizationEdge -> {
       // Update the information.
-      final List<KeyRange> taskGroupIdxToHashRange = new ArrayList<>();
-      IntStream.range(0, taskGroupListSize).forEach(i -> taskGroupIdxToHashRange.add(keyRanges.get(i)));
-      optimizationEdge.setTaskGroupIdxToKeyRange(taskGroupIdxToHashRange);
+      final List<KeyRange> taskIdxToHashRange = new ArrayList<>();
+      IntStream.range(0, taskListSize).forEach(i -> taskIdxToHashRange.add(keyRanges.get(i)));
+      optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
     });
 
     return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build(), originalPlan.getTaskIRVertexMap());
@@ -94,12 +94,12 @@ public final class DataSkewRuntimePass implements RuntimePass<Map<String, List<P
    * Method for calculating key ranges to evenly distribute the skewed metric data.
    *
    * @param metricData        the metric data.
-   * @param taskGroupListSize the size of the task group list.
+   * @param taskListSize the size of the task list.
    * @return the list of key ranges calculated.
    */
   @VisibleForTesting
   public List<KeyRange> calculateHashRanges(final Map<String, List<Pair<Integer, Long>>> metricData,
-                                            final Integer taskGroupListSize) {
+                                            final Integer taskListSize) {
     // NOTE: metricData is made up of a map of blockId to blockSizes.
     // Count the hash range (number of blocks for each block).
     final int maxHashValue = metricData.values().stream()
@@ -127,18 +127,18 @@ public final class DataSkewRuntimePass implements RuntimePass<Map<String, List<P
 
     // Do the optimization using the information derived above.
     final Long totalSize = aggregatedMetricData.values().stream().mapToLong(n -> n).sum(); // get total size
-    final Long idealSizePerTaskGroup = totalSize / taskGroupListSize; // and derive the ideal size per task group
-    LOG.info("idealSizePerTaskgroup {} = {}(totalSize) / {}(taskGroupListSize)",
-        idealSizePerTaskGroup, totalSize, taskGroupListSize);
+    final Long idealSizePerTask = totalSize / taskListSize; // and derive the ideal size per task
+    LOG.info("idealSizePerTask {} = {}(totalSize) / {}(taskListSize)",
+        idealSizePerTask, totalSize, taskListSize);
 
     // find HashRanges to apply (for each blocks of each block).
-    final List<KeyRange> keyRanges = new ArrayList<>(taskGroupListSize);
+    final List<KeyRange> keyRanges = new ArrayList<>(taskListSize);
     int startingHashValue = 0;
     int finishingHashValue = 1; // initial values
     Long currentAccumulatedSize = aggregatedMetricData.getOrDefault(startingHashValue, 0L);
-    for (int i = 1; i <= taskGroupListSize; i++) {
-      if (i != taskGroupListSize) {
-        final Long idealAccumulatedSize = idealSizePerTaskGroup * i; // where we should end
+    for (int i = 1; i <= taskListSize; i++) {
+      if (i != taskListSize) {
+        final Long idealAccumulatedSize = idealSizePerTask * i; // where we should end
         // find the point while adding up one by one.
         while (currentAccumulatedSize < idealAccumulatedSize) {
           currentAccumulatedSize += aggregatedMetricData.getOrDefault(finishingHashValue, 0L);
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
index 782f8f9..2ae7271 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalPlanGenerator.java
@@ -228,9 +228,9 @@ public final class PhysicalPlanGenerator
       final int stageParallelism = firstVertexProperties.get(ExecutionProperty.Key.Parallelism);
       final String containerType = firstVertexProperties.get(ExecutionProperty.Key.ExecutorPlacement);
 
-      // Only one task group DAG will be created and reused.
+      // Only one task DAG will be created and reused.
       final DAGBuilder<Task, RuntimeEdge<Task>> stageInternalDAGBuilder = new DAGBuilder<>();
-      // Collect split source readables in advance and bind to each scheduled task group to avoid extra source split.
+      // Collect split source readables in advance and bind to each scheduled task to avoid extra source split.
       final List<Map<String, Readable>> logicalTaskIdToReadables = new ArrayList<>(stageParallelism);
       for (int i = 0; i < stageParallelism; i++) {
         logicalTaskIdToReadables.add(new HashMap<>());
@@ -273,7 +273,7 @@ public final class PhysicalPlanGenerator
         taskIRVertexMap.put(newTaskToAdd, irVertex);
       });
 
-      // connect internal edges in the task group. It suffices to iterate over only the stage internal inEdges.
+      // connect internal edges in the task. It suffices to iterate over only the stage internal inEdges.
       final DAG<IRVertex, IREdge> stageInternalDAG = stage.getStageInternalDAG();
       stageInternalDAG.getVertices().forEach(irVertex -> {
         final List<IREdge> inEdges = stageInternalDAG.getIncomingEdgesOf(irVertex);
@@ -283,7 +283,7 @@ public final class PhysicalPlanGenerator
                 edge.getCoder(), edge.isSideInput())));
       });
 
-      // Create the task group to add for this stage.
+      // Create the task to add for this stage.
       final PhysicalStage physicalStage =
           new PhysicalStage(stage.getId(), stageInternalDAGBuilder.build(),
               stageParallelism, stage.getScheduleGroupIndex(), containerType, logicalTaskIdToReadables);
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
index 1fe3ef2..ded0a8e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStage.java
@@ -30,61 +30,61 @@ import java.util.Map;
  * PhysicalStage.
  */
 public final class PhysicalStage extends Vertex {
-  private final DAG<Task, RuntimeEdge<Task>> taskGroupDag;
+  private final DAG<Task, RuntimeEdge<Task>> taskDag;
   private final int parallelism;
   private final int scheduleGroupIndex;
   private final String containerType;
-  private final byte[] serializedTaskGroupDag;
+  private final byte[] serializedTaskDag;
   private final List<Map<String, Readable>> logicalTaskIdToReadables;
 
   /**
    * Constructor.
    *
    * @param stageId                  ID of the stage.
-   * @param taskGroupDag             the DAG of the task group in this stage.
-   * @param parallelism              how many task groups will be executed in this stage.
+   * @param taskDag                  the DAG of the task in this stage.
+   * @param parallelism              how many tasks will be executed in this stage.
    * @param scheduleGroupIndex       the schedule group index.
-   * @param containerType            the type of container to execute the task group on.
+   * @param containerType            the type of container to execute the task on.
    * @param logicalTaskIdToReadables the list of maps between logical task ID and {@link Readable}.
    */
   public PhysicalStage(final String stageId,
-                       final DAG<Task, RuntimeEdge<Task>> taskGroupDag,
+                       final DAG<Task, RuntimeEdge<Task>> taskDag,
                        final int parallelism,
                        final int scheduleGroupIndex,
                        final String containerType,
                        final List<Map<String, Readable>> logicalTaskIdToReadables) {
     super(stageId);
-    this.taskGroupDag = taskGroupDag;
+    this.taskDag = taskDag;
     this.parallelism = parallelism;
     this.scheduleGroupIndex = scheduleGroupIndex;
     this.containerType = containerType;
-    this.serializedTaskGroupDag = SerializationUtils.serialize(taskGroupDag);
+    this.serializedTaskDag = SerializationUtils.serialize(taskDag);
     this.logicalTaskIdToReadables = logicalTaskIdToReadables;
   }
 
   /**
-   * @return the task group.
+   * @return the task.
    */
-  public DAG<Task, RuntimeEdge<Task>> getTaskGroupDag() {
-    return taskGroupDag;
+  public DAG<Task, RuntimeEdge<Task>> getTaskDag() {
+    return taskDag;
   }
 
   /**
-   * @return the serialized DAG of the task group.
+   * @return the serialized DAG of the task.
    */
-  public byte[] getSerializedTaskGroupDag() {
-    return serializedTaskGroupDag;
+  public byte[] getSerializedTaskDag() {
+    return serializedTaskDag;
   }
 
   /**
-   * @return the list of the task group IDs in this stage.
+   * @return the list of the task IDs in this stage.
    */
-  public List<String> getTaskGroupIds() {
-    final List<String> taskGroupIds = new ArrayList<>();
-    for (int taskGroupIdx = 0; taskGroupIdx < parallelism; taskGroupIdx++) {
-      taskGroupIds.add(RuntimeIdGenerator.generateTaskGroupId(taskGroupIdx, getId()));
+  public List<String> getTaskIds() {
+    final List<String> taskIds = new ArrayList<>();
+    for (int taskIdx = 0; taskIdx < parallelism; taskIdx++) {
+      taskIds.add(RuntimeIdGenerator.generateTaskId(taskIdx, getId()));
     }
-    return taskGroupIds;
+    return taskIds;
   }
 
   /**
@@ -95,7 +95,7 @@ public final class PhysicalStage extends Vertex {
   }
 
   /**
-   * @return the type of container to execute the task group on.
+   * @return the type of container to execute the task on.
    */
   public String getContainerType() {
     return containerType;
@@ -112,7 +112,7 @@ public final class PhysicalStage extends Vertex {
   public String propertiesToJSON() {
     final StringBuilder sb = new StringBuilder();
     sb.append("{\"scheduleGroupIndex\": ").append(scheduleGroupIndex);
-    sb.append(", \"taskGroupDag\": ").append(taskGroupDag);
+    sb.append(", \"taskDag\": ").append(taskDag);
     sb.append(", \"parallelism\": ").append(parallelism);
     sb.append(", \"containerType\": \"").append(containerType).append("\"");
     sb.append('}');
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java
index 18d18b8..13c83e8 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/PhysicalStageEdge.java
@@ -42,9 +42,9 @@ public final class PhysicalStageEdge extends RuntimeEdge<PhysicalStage> {
   private final IRVertex dstVertex;
 
   /**
-   * The list between the task group idx and key range to read.
+   * The list between the task idx and key range to read.
    */
-  private List<KeyRange> taskGroupIdxToKeyRange;
+  private List<KeyRange> taskIdxToKeyRange;
 
   /**
    * Constructor.
@@ -69,9 +69,9 @@ public final class PhysicalStageEdge extends RuntimeEdge<PhysicalStage> {
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
     // Initialize the key range of each dst task.
-    this.taskGroupIdxToKeyRange = new ArrayList<>();
-    for (int taskIdx = 0; taskIdx < dstStage.getTaskGroupIds().size(); taskIdx++) {
-      taskGroupIdxToKeyRange.add(HashRange.of(taskIdx, taskIdx + 1));
+    this.taskIdxToKeyRange = new ArrayList<>();
+    for (int taskIdx = 0; taskIdx < dstStage.getTaskIds().size(); taskIdx++) {
+      taskIdxToKeyRange.add(HashRange.of(taskIdx, taskIdx + 1));
     }
   }
 
@@ -102,17 +102,17 @@ public final class PhysicalStageEdge extends RuntimeEdge<PhysicalStage> {
   }
 
   /**
-   * @return the list between the task group idx and key range to read.
+   * @return the list between the task idx and key range to read.
    */
-  public List<KeyRange> getTaskGroupIdxToKeyRange() {
-    return taskGroupIdxToKeyRange;
+  public List<KeyRange> getTaskIdxToKeyRange() {
+    return taskIdxToKeyRange;
   }
 
   /**
-   * Sets the task group idx to key range list.
-   * @param taskGroupIdxToKeyRange the list to set.
+   * Sets the task idx to key range list.
+   * @param taskIdxToKeyRange the list to set.
    */
-  public void setTaskGroupIdxToKeyRange(final List<KeyRange> taskGroupIdxToKeyRange) {
-    this.taskGroupIdxToKeyRange = taskGroupIdxToKeyRange;
+  public void setTaskIdxToKeyRange(final List<KeyRange> taskIdxToKeyRange) {
+    this.taskIdxToKeyRange = taskIdxToKeyRange;
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java
new file mode 100644
index 0000000..d5f19a7
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTask.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright (C) 2017 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.plan.physical;
+
+import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A ScheduledTask is a grouping of {@link Task}s that belong to a stage.
+ * Executors receive units of ScheduledTasks during job execution,
+ * and thus the resource type of all tasks of a ScheduledTask must be identical.
+ * A stage contains a list of IDs of Tasks whose length corresponds to stage/operator parallelism.
+ *
+ * This class includes all information which will be required from the executor after scheduled,
+ * including the (serialized) DAG of {@link Task}s,
+ * the incoming/outgoing edges to/from the stage the Task belongs to, and so on.
+ */
+public final class ScheduledTask implements Serializable {
+  private final String jobId;
+  private final String taskId;
+  private final int taskIdx;
+  private final List<PhysicalStageEdge> taskIncomingEdges;
+  private final List<PhysicalStageEdge> taskOutgoingEdges;
+  private final int attemptIdx;
+  private final String containerType;
+  private final byte[] serializedTaskDag;
+  private final Map<String, Readable> logicalTaskIdToReadable;
+
+  /**
+   * Constructor.
+   *
+   * @param jobId                   the id of the job.
+   * @param serializedTaskDag  the serialized DAG of the task.
+   * @param taskId             the ID of the scheduled task.
+   * @param taskIncomingEdges  the incoming edges of the task.
+   * @param taskOutgoingEdges  the outgoing edges of the task.
+   * @param attemptIdx              the attempt index.
+   * @param containerType           the type of container to execute the task on.
+   * @param logicalTaskIdToReadable the map between logical task ID and readable.
+   */
+  public ScheduledTask(final String jobId,
+                            final byte[] serializedTaskDag,
+                            final String taskId,
+                            final List<PhysicalStageEdge> taskIncomingEdges,
+                            final List<PhysicalStageEdge> taskOutgoingEdges,
+                            final int attemptIdx,
+                            final String containerType,
+                            final Map<String, Readable> logicalTaskIdToReadable) {
+    this.jobId = jobId;
+    this.taskId = taskId;
+    this.taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
+    this.taskIncomingEdges = taskIncomingEdges;
+    this.taskOutgoingEdges = taskOutgoingEdges;
+    this.attemptIdx = attemptIdx;
+    this.containerType = containerType;
+    this.serializedTaskDag = serializedTaskDag;
+    this.logicalTaskIdToReadable = logicalTaskIdToReadable;
+  }
+
+  /**
+   * @return the id of the job.
+   */
+  public String getJobId() {
+    return jobId;
+  }
+
+  /**
+   * @return the serialized DAG of the task.
+   */
+  public byte[] getSerializedTaskDag() {
+    return serializedTaskDag;
+  }
+
+  /**
+   * @return the ID of the scheduled task.
+   */
+  public String getTaskId() {
+    return taskId;
+  }
+
+  /**
+   * @return the idx of the scheduled task.
+   */
+  public int getTaskIdx() {
+    return taskIdx;
+  }
+
+  /**
+   * @return the incoming edges of the task.
+   */
+  public List<PhysicalStageEdge> getTaskIncomingEdges() {
+    return taskIncomingEdges;
+  }
+
+  /**
+   * @return the outgoing edges of the task.
+   */
+  public List<PhysicalStageEdge> getTaskOutgoingEdges() {
+    return taskOutgoingEdges;
+  }
+
+  /**
+   * @return the attempt index.
+   */
+  public int getAttemptIdx() {
+    return attemptIdx;
+  }
+
+  /**
+   * @return the type of container to execute the task on.
+   */
+  public String getContainerType() {
+    return containerType;
+  }
+
+  /**
+   * @return the map between logical task ID and readable.
+   */
+  public Map<String, Readable> getLogicalTaskIdToReadable() {
+    return logicalTaskIdToReadable;
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTaskGroup.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTaskGroup.java
deleted file mode 100644
index 6f06a97..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/ScheduledTaskGroup.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.plan.physical;
-
-import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A ScheduledTaskGroup is a grouping of {@link Task}s that belong to a stage.
- * Executors receive units of ScheduledTaskGroups during job execution,
- * and thus the resource type of all tasks of a ScheduledTaskGroup must be identical.
- * A stage contains a list of IDs of TaskGroups whose length corresponds to stage/operator parallelism.
- *
- * This class includes all information which will be required from the executor after scheduled,
- * including the (serialized) DAG of {@link Task}s,
- * the incoming/outgoing edges to/from the stage the TaskGroup belongs to, and so on.
- */
-public final class ScheduledTaskGroup implements Serializable {
-  private final String jobId;
-  private final String taskGroupId;
-  private final int taskGroupIdx;
-  private final List<PhysicalStageEdge> taskGroupIncomingEdges;
-  private final List<PhysicalStageEdge> taskGroupOutgoingEdges;
-  private final int attemptIdx;
-  private final String containerType;
-  private final byte[] serializedTaskGroupDag;
-  private final Map<String, Readable> logicalTaskIdToReadable;
-
-  /**
-   * Constructor.
-   *
-   * @param jobId                   the id of the job.
-   * @param serializedTaskGroupDag  the serialized DAG of the task group.
-   * @param taskGroupId             the ID of the scheduled task group.
-   * @param taskGroupIncomingEdges  the incoming edges of the task group.
-   * @param taskGroupOutgoingEdges  the outgoing edges of the task group.
-   * @param attemptIdx              the attempt index.
-   * @param containerType           the type of container to execute the task group on.
-   * @param logicalTaskIdToReadable the map between logical task ID and readable.
-   */
-  public ScheduledTaskGroup(final String jobId,
-                            final byte[] serializedTaskGroupDag,
-                            final String taskGroupId,
-                            final List<PhysicalStageEdge> taskGroupIncomingEdges,
-                            final List<PhysicalStageEdge> taskGroupOutgoingEdges,
-                            final int attemptIdx,
-                            final String containerType,
-                            final Map<String, Readable> logicalTaskIdToReadable) {
-    this.jobId = jobId;
-    this.taskGroupId = taskGroupId;
-    this.taskGroupIdx = RuntimeIdGenerator.getIndexFromTaskGroupId(taskGroupId);
-    this.taskGroupIncomingEdges = taskGroupIncomingEdges;
-    this.taskGroupOutgoingEdges = taskGroupOutgoingEdges;
-    this.attemptIdx = attemptIdx;
-    this.containerType = containerType;
-    this.serializedTaskGroupDag = serializedTaskGroupDag;
-    this.logicalTaskIdToReadable = logicalTaskIdToReadable;
-  }
-
-  /**
-   * @return the id of the job.
-   */
-  public String getJobId() {
-    return jobId;
-  }
-
-  /**
-   * @return the serialized DAG of the task group.
-   */
-  public byte[] getSerializedTaskGroupDag() {
-    return serializedTaskGroupDag;
-  }
-
-  /**
-   * @return the ID of the scheduled task group.
-   */
-  public String getTaskGroupId() {
-    return taskGroupId;
-  }
-
-  /**
-   * @return the idx of the scheduled task group.
-   */
-  public int getTaskGroupIdx() {
-    return taskGroupIdx;
-  }
-
-  /**
-   * @return the incoming edges of the taskGroup.
-   */
-  public List<PhysicalStageEdge> getTaskGroupIncomingEdges() {
-    return taskGroupIncomingEdges;
-  }
-
-  /**
-   * @return the outgoing edges of the taskGroup.
-   */
-  public List<PhysicalStageEdge> getTaskGroupOutgoingEdges() {
-    return taskGroupOutgoingEdges;
-  }
-
-  /**
-   * @return the attempt index.
-   */
-  public int getAttemptIdx() {
-    return attemptIdx;
-  }
-
-  /**
-   * @return the type of container to execute the task group on.
-   */
-  public String getContainerType() {
-    return containerType;
-  }
-
-  /**
-   * @return the map between logical task ID and readable.
-   */
-  public Map<String, Readable> getLogicalTaskIdToReadable() {
-    return logicalTaskIdToReadable;
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java
index 2c375a0..8d241e1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/physical/Task.java
@@ -19,7 +19,7 @@ import edu.snu.nemo.common.dag.Vertex;
 
 /**
  * Task.
- * The index value is identical to the TaskGroup's index it belongs to.
+ * The index value is identical to the Task's index it belongs to.
  */
 public abstract class Task extends Vertex {
   private final String irVertexId;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
index b092b1f..5731e49 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/BlockState.java
@@ -34,14 +34,14 @@ public final class BlockState {
     stateMachineBuilder.addState(State.READY, "The block is ready to be created.");
     stateMachineBuilder.addState(State.SCHEDULED, "The block is scheduled for creation.");
     stateMachineBuilder.addState(State.COMMITTED, "The block has been committed.");
-    stateMachineBuilder.addState(State.LOST_BEFORE_COMMIT, "The task group that produces the block is scheduled, "
+    stateMachineBuilder.addState(State.LOST_BEFORE_COMMIT, "The task that produces the block is scheduled, "
         + "but failed before committing");
     stateMachineBuilder.addState(State.REMOVED, "The block has been removed (e.g., GC-ed).");
     stateMachineBuilder.addState(State.LOST, "Block lost.");
 
     // Add transitions
     stateMachineBuilder.addTransition(State.READY, State.SCHEDULED,
-        "The task group that produces the block is scheduled.");
+        "The task that produces the block is scheduled.");
     stateMachineBuilder.addTransition(State.SCHEDULED, State.COMMITTED, "Successfully moved and committed");
     stateMachineBuilder.addTransition(State.SCHEDULED, State.LOST_BEFORE_COMMIT, "The block is lost before commit");
     stateMachineBuilder.addTransition(State.COMMITTED, State.LOST, "Lost after committed");
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
index f9fd273..81286c1 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/JobState.java
@@ -24,10 +24,10 @@ public final class JobState {
   private final StateMachine stateMachine;
 
   public JobState() {
-    stateMachine = buildTaskGroupStateMachine();
+    stateMachine = buildTaskStateMachine();
   }
 
-  private StateMachine buildTaskGroupStateMachine() {
+  private StateMachine buildTaskStateMachine() {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
index 42592ad..6e2a3d5 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/StageState.java
@@ -24,31 +24,31 @@ public final class StageState {
   private final StateMachine stateMachine;
 
   public StageState() {
-    stateMachine = buildTaskGroupStateMachine();
+    stateMachine = buildTaskStateMachine();
   }
 
-  private StateMachine buildTaskGroupStateMachine() {
+  private StateMachine buildTaskStateMachine() {
     final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
 
     // Add states
     stateMachineBuilder.addState(State.READY, "The stage has been created.");
     stateMachineBuilder.addState(State.EXECUTING, "The stage is executing.");
-    stateMachineBuilder.addState(State.COMPLETE, "All of this stage's task groups have completed.");
+    stateMachineBuilder.addState(State.COMPLETE, "All of this stage's tasks have completed.");
     stateMachineBuilder.addState(State.FAILED_RECOVERABLE, "Stage failed, but is recoverable.");
     stateMachineBuilder.addState(State.FAILED_UNRECOVERABLE, "Stage failed, and is unrecoverable. The job will fail.");
 
     // Add transitions
     stateMachineBuilder.addTransition(State.READY, State.EXECUTING,
-        "The stage can now schedule its task groups");
+        "The stage can now schedule its tasks");
     stateMachineBuilder.addTransition(State.READY, State.FAILED_UNRECOVERABLE,
         "Job Failure");
 
     stateMachineBuilder.addTransition(State.EXECUTING, State.COMPLETE,
-        "All task groups complete");
+        "All tasks complete");
     stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED_UNRECOVERABLE,
-        "Unrecoverable failure in a task group");
+        "Unrecoverable failure in a task");
     stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED_RECOVERABLE,
-        "Recoverable failure in a task group");
+        "Recoverable failure in a task");
 
     stateMachineBuilder.addTransition(State.COMPLETE, State.FAILED_RECOVERABLE,
         "Container on which the stage's output is stored failed");
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskGroupState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskGroupState.java
deleted file mode 100644
index 56967e1..0000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskGroupState.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Copyright (C) 2017 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.state;
-
-import edu.snu.nemo.common.StateMachine;
-
-/**
- * Represents the states and their transitions of a
- * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup}.
- */
-public final class TaskGroupState {
-  private final StateMachine stateMachine;
-
-  public TaskGroupState() {
-    stateMachine = buildTaskGroupStateMachine();
-  }
-
-  private StateMachine buildTaskGroupStateMachine() {
-    final StateMachine.Builder stateMachineBuilder = StateMachine.newBuilder();
-
-    // Add states
-    stateMachineBuilder.addState(State.READY, "The task group has been created.");
-    stateMachineBuilder.addState(State.EXECUTING, "The task group is executing.");
-    stateMachineBuilder.addState(State.COMPLETE, "All of this task group's tasks have completed.");
-    stateMachineBuilder.addState(State.FAILED_RECOVERABLE, "Task group failed, but is recoverable.");
-    stateMachineBuilder.addState(State.FAILED_UNRECOVERABLE,
-        "Task group failed, and is unrecoverable. The job will fail.");
-    stateMachineBuilder.addState(State.ON_HOLD, "The task group is paused for dynamic optimization.");
-
-
-    // Add transitions
-    stateMachineBuilder.addTransition(State.READY, State.EXECUTING,
-        "Scheduling to executor");
-    stateMachineBuilder.addTransition(State.READY, State.FAILED_RECOVERABLE,
-        "Stage Failure by a recoverable failure in another task group");
-    stateMachineBuilder.addTransition(State.READY, State.FAILED_UNRECOVERABLE,
-        "Stage Failure");
-
-    stateMachineBuilder.addTransition(State.EXECUTING, State.COMPLETE,
-        "All tasks complete");
-    stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED_UNRECOVERABLE,
-        "Unrecoverable failure in a task/Executor failure");
-    stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED_RECOVERABLE,
-        "Recoverable failure in a task/Container failure");
-    stateMachineBuilder.addTransition(State.EXECUTING, State.ON_HOLD, "Task group paused for dynamic optimization");
-    stateMachineBuilder.addTransition(State.ON_HOLD, State.COMPLETE, "Task group completed after dynamic optimization");
-
-    stateMachineBuilder.addTransition(State.COMPLETE, State.FAILED_RECOVERABLE,
-        "Recoverable failure in a task/Container failure");
-
-    stateMachineBuilder.addTransition(State.FAILED_RECOVERABLE, State.READY,
-        "Recoverable task group failure");
-    stateMachineBuilder.addTransition(State.FAILED_RECOVERABLE, State.FAILED_UNRECOVERABLE,
-        "");
-
-    stateMachineBuilder.setInitialState(State.READY);
-
-    return stateMachineBuilder.build();
-  }
-
-  public StateMachine getStateMachine() {
-    return stateMachine;
-  }
-
-  /**
-   * TaskGroupState.
-   */
-  public enum State {
-    READY,
-    EXECUTING,
-    COMPLETE,
-    FAILED_RECOVERABLE,
-    FAILED_UNRECOVERABLE,
-    ON_HOLD, // for dynamic optimization
-  }
-
-  /**
-   * Causes of a recoverable failure.
-   */
-  public enum RecoverableFailureCause {
-    INPUT_READ_FAILURE, // Occurs when a task is unable to read its input block
-    OUTPUT_WRITE_FAILURE, // Occurs when a task successfully generates its output, but is able to write it
-    CONTAINER_FAILURE // When a REEF evaluator fails
-  }
-
-  @Override
-  public String toString() {
-    final StringBuffer sb = new StringBuffer();
-    sb.append(stateMachine.getCurrentState());
-    return sb.toString();
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
index bb438a7..08a9a66 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/state/TaskState.java
@@ -18,7 +18,8 @@ package edu.snu.nemo.runtime.common.state;
 import edu.snu.nemo.common.StateMachine;
 
 /**
- * Represents the states and their transitions of a {@link edu.snu.nemo.runtime.common.plan.physical.Task}.
+ * Represents the states and their transitions of a
+ * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask}.
  */
 public final class TaskState {
   private final StateMachine stateMachine;
@@ -32,48 +33,37 @@ public final class TaskState {
 
     // Add states
     stateMachineBuilder.addState(State.READY, "The task has been created.");
-    stateMachineBuilder.addState(State.PENDING_IN_EXECUTOR, "The task is pending in its executor.");
     stateMachineBuilder.addState(State.EXECUTING, "The task is executing.");
-    stateMachineBuilder.addState(State.COMPLETE, "The task's execution is complete with its output committed.");
+    stateMachineBuilder.addState(State.COMPLETE, "The task has completed.");
     stateMachineBuilder.addState(State.FAILED_RECOVERABLE, "Task failed, but is recoverable.");
-    stateMachineBuilder.addState(State.FAILED_UNRECOVERABLE, "Task failed, and is unrecoverable. The job will fail.");
+    stateMachineBuilder.addState(State.FAILED_UNRECOVERABLE,
+        "Task failed, and is unrecoverable. The job will fail.");
     stateMachineBuilder.addState(State.ON_HOLD, "The task is paused for dynamic optimization.");
 
     // Add transitions
-    stateMachineBuilder.addTransition(State.READY, State.PENDING_IN_EXECUTOR,
-        "Scheduling for execution");
-    stateMachineBuilder.addTransition(State.READY, State.FAILED_UNRECOVERABLE,
-        "Unrecoverable TaskGroup Failure");
+    stateMachineBuilder.addTransition(State.READY, State.EXECUTING,
+        "Scheduling to executor");
     stateMachineBuilder.addTransition(State.READY, State.FAILED_RECOVERABLE,
-        "Recoverable TaskGroup Failure");
-
-    stateMachineBuilder.addTransition(State.PENDING_IN_EXECUTOR, State.EXECUTING,
-        "Begin executing!");
-    stateMachineBuilder.addTransition(State.PENDING_IN_EXECUTOR, State.FAILED_UNRECOVERABLE,
-        "Unrecoverable TaskGroup Failure/Executor Failure");
-    stateMachineBuilder.addTransition(State.PENDING_IN_EXECUTOR, State.FAILED_RECOVERABLE,
-        "Recoverable TaskGroup Failure/Container Failure");
+        "Stage Failure by a recoverable failure in another task");
+    stateMachineBuilder.addTransition(State.READY, State.FAILED_UNRECOVERABLE,
+        "Stage Failure");
 
     stateMachineBuilder.addTransition(State.EXECUTING, State.COMPLETE,
-        "Task computation done");
+        "All tasks complete");
     stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED_UNRECOVERABLE,
-        "Unexpected failure/Executor Failure");
+        "Unrecoverable failure in a task/Executor failure");
     stateMachineBuilder.addTransition(State.EXECUTING, State.FAILED_RECOVERABLE,
-        "Container Failure");
+        "Recoverable failure in a task/Container failure");
     stateMachineBuilder.addTransition(State.EXECUTING, State.ON_HOLD, "Task paused for dynamic optimization");
-
     stateMachineBuilder.addTransition(State.ON_HOLD, State.COMPLETE, "Task completed after dynamic optimization");
 
-    stateMachineBuilder.addTransition(State.COMPLETE, State.FAILED_UNRECOVERABLE,
-        "Executor Failure");
-
     stateMachineBuilder.addTransition(State.COMPLETE, State.FAILED_RECOVERABLE,
-        "Container Failure");
+        "Recoverable failure in a task/Container failure");
 
+    stateMachineBuilder.addTransition(State.FAILED_RECOVERABLE, State.READY,
+        "Recovered from failure and is ready");
     stateMachineBuilder.addTransition(State.FAILED_RECOVERABLE, State.FAILED_UNRECOVERABLE,
         "");
-    stateMachineBuilder.addTransition(State.FAILED_RECOVERABLE, State.READY,
-        "Recoverable Task Failure");
 
     stateMachineBuilder.setInitialState(State.READY);
 
@@ -89,8 +79,6 @@ public final class TaskState {
    */
   public enum State {
     READY,
-    // PENDING_IN_EXECUTOR and EXECUTING states are only managed in executor.
-    PENDING_IN_EXECUTOR,
     EXECUTING,
     COMPLETE,
     FAILED_RECOVERABLE,
@@ -98,6 +86,15 @@ public final class TaskState {
     ON_HOLD, // for dynamic optimization
   }
 
+  /**
+   * Causes of a recoverable failure.
+   */
+  public enum RecoverableFailureCause {
+    INPUT_READ_FAILURE, // Occurs when a task is unable to read its input block
+    OUTPUT_WRITE_FAILURE, // Occurs when a task successfully generates its output, but is unable to write it
+    CONTAINER_FAILURE // When a REEF evaluator fails
+  }
+
   @Override
   public String toString() {
     final StringBuffer sb = new StringBuffer();
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index f3ae7b8..c266bab 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -20,8 +20,8 @@ option java_package = "edu.snu.nemo.runtime.common.comm";
 option java_outer_classname = "ControlMessage";
 
 enum MessageType {
-    TaskGroupStateChanged = 0;
-    ScheduleTaskGroup = 1;
+    TaskStateChanged = 0;
+    ScheduleTask = 1;
     BlockStateChanged = 2;
     DataSizeMetric = 3;
     RequestBlockLocation = 4;
@@ -34,8 +34,8 @@ message Message {
     required MessageType type = 1;
     required int64 id = 2;
     required string listenerId = 3; // The id of the message listner (handler).
-    optional TaskGroupStateChangedMsg taskGroupStateChangedMsg = 4;
-    optional ScheduleTaskGroupMsg scheduleTaskGroupMsg = 5;
+    optional TaskStateChangedMsg taskStateChangedMsg = 4;
+    optional ScheduleTaskMsg scheduleTaskMsg = 5;
     optional BlockStateChangedMsg blockStateChangedMsg = 6;
     optional DataSizeMetricMsg dataSizeMetricMsg = 7;
     optional RequestBlockLocationMsg requestBlockLocationMsg = 8;
@@ -46,8 +46,8 @@ message Message {
 }
 
 // Messages from Master to Executors
-message ScheduleTaskGroupMsg {
-    required bytes taskGroup = 1;
+message ScheduleTaskMsg {
+    required bytes task = 1;
 }
 
 message BlockLocationInfoMsg {
@@ -58,10 +58,10 @@ message BlockLocationInfoMsg {
 }
 
 // Messages from Executors to Master
-message TaskGroupStateChangedMsg {
+message TaskStateChangedMsg {
     required string executorId = 1;
-    required string taskGroupId = 2;
-    required TaskGroupStateFromExecutor state = 3;
+    required string taskId = 2;
+    required TaskStateFromExecutor state = 3;
     optional string taskPutOnHoldId = 4;
     optional RecoverableFailureCause failureCause = 5;
     required int32 attemptIdx = 6;
@@ -131,7 +131,7 @@ message ByteTransferContextDescriptor {
     optional bytes keyRange = 4;
 }
 
-enum TaskGroupStateFromExecutor {
+enum TaskStateFromExecutor {
     READY = 0;
     EXECUTING = 1;
     COMPLETE = 2;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
index 06a07d2..8fd5281 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/Executor.java
@@ -27,7 +27,7 @@ import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.common.plan.physical.Task;
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
@@ -49,7 +49,7 @@ public final class Executor {
   private final String executorId;
 
   /**
-   * To be used for a thread pool to execute task groups.
+   * To be used for a thread pool to execute tasks.
    */
   private final ExecutorService executorService;
 
@@ -87,36 +87,35 @@ public final class Executor {
     return executorId;
   }
 
-  private synchronized void onTaskGroupReceived(final ScheduledTaskGroup scheduledTaskGroup) {
-    LOG.debug("Executor [{}] received TaskGroup [{}] to execute.",
-        new Object[]{executorId, scheduledTaskGroup.getTaskGroupId()});
-    executorService.execute(() -> launchTaskGroup(scheduledTaskGroup));
+  private synchronized void onTaskReceived(final ScheduledTask scheduledTask) {
+    LOG.debug("Executor [{}] received Task [{}] to execute.",
+        new Object[]{executorId, scheduledTask.getTaskId()});
+    executorService.execute(() -> launchTask(scheduledTask));
   }
 
   /**
-   * Launches the TaskGroup, and keeps track of the execution state with taskGroupStateManager.
-   * @param scheduledTaskGroup to launch.
+   * Launches the Task, and keeps track of the execution state with taskStateManager.
+   * @param scheduledTask to launch.
    */
-  private void launchTaskGroup(final ScheduledTaskGroup scheduledTaskGroup) {
+  private void launchTask(final ScheduledTask scheduledTask) {
     try {
-      final DAG<Task, RuntimeEdge<Task>> taskGroupDag =
-          SerializationUtils.deserialize(scheduledTaskGroup.getSerializedTaskGroupDag());
-      final TaskGroupStateManager taskGroupStateManager =
-          new TaskGroupStateManager(scheduledTaskGroup, taskGroupDag, executorId,
+      final DAG<Task, RuntimeEdge<Task>> taskDag =
+          SerializationUtils.deserialize(scheduledTask.getSerializedTaskDag());
+      final TaskStateManager taskStateManager =
+          new TaskStateManager(scheduledTask, taskDag, executorId,
               persistentConnectionToMasterMap, metricMessageSender);
 
-      scheduledTaskGroup.getTaskGroupIncomingEdges()
+      scheduledTask.getTaskIncomingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      scheduledTaskGroup.getTaskGroupOutgoingEdges()
+      scheduledTask.getTaskOutgoingEdges()
           .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
-      // TODO #432: remove these coders when we "streamize" task execution within a TaskGroup.
-      taskGroupDag.getVertices().forEach(v -> {
-        taskGroupDag.getOutgoingEdgesOf(v)
+      taskDag.getVertices().forEach(v -> {
+        taskDag.getOutgoingEdgesOf(v)
             .forEach(e -> serializerManager.register(e.getId(), e.getCoder(), e.getExecutionProperties()));
       });
 
-      new TaskGroupExecutor(
-          scheduledTaskGroup, taskGroupDag, taskGroupStateManager, dataTransferFactory, metricMessageSender).execute();
+      new TaskExecutor(
+          scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender).execute();
     } catch (final Exception e) {
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
           ControlMessage.Message.newBuilder()
@@ -149,11 +148,11 @@ public final class Executor {
     @Override
     public void onMessage(final ControlMessage.Message message) {
       switch (message.getType()) {
-      case ScheduleTaskGroup:
-        final ControlMessage.ScheduleTaskGroupMsg scheduleTaskGroupMsg = message.getScheduleTaskGroupMsg();
-        final ScheduledTaskGroup scheduledTaskGroup =
-            SerializationUtils.deserialize(scheduleTaskGroupMsg.getTaskGroup().toByteArray());
-        onTaskGroupReceived(scheduledTaskGroup);
+      case ScheduleTask:
+        final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
+        final ScheduledTask scheduledTask =
+            SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
+        onTaskReceived(scheduledTask);
         break;
       default:
         throw new IllegalMessageException(
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
similarity index 87%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
index cefef0a..c68215a 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskExecutor.java
@@ -26,7 +26,7 @@ import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.*;
 
@@ -38,15 +38,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Executes a task group.
+ * Executes a task.
  */
-public final class TaskGroupExecutor {
-  private static final Logger LOG = LoggerFactory.getLogger(TaskGroupExecutor.class.getName());
+public final class TaskExecutor {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class.getName());
 
-  private final DAG<Task, RuntimeEdge<Task>> taskGroupDag;
-  private final String taskGroupId;
-  private final int taskGroupIdx;
-  private final TaskGroupStateManager taskGroupStateManager;
+  private final DAG<Task, RuntimeEdge<Task>> taskDag;
+  private final String taskId;
+  private final int taskIdx;
+  private final TaskStateManager taskStateManager;
   private final List<PhysicalStageEdge> stageIncomingEdges;
   private final List<PhysicalStageEdge> stageOutgoingEdges;
   private final DataTransferFactory channelFactory;
@@ -76,24 +76,24 @@ public final class TaskGroupExecutor {
 
   /**
    * Constructor.
-   * @param scheduledTaskGroup TaskGroup with information needed during execution.
-   * @param taskGroupDag TaskGroup expressed as a DAG of Tasks.
-   * @param taskGroupStateManager State manager for this TaskGroup.
+   * @param scheduledTask Task with information needed during execution.
+   * @param taskDag A DAG of Tasks.
+   * @param taskStateManager State manager for this Task.
    * @param channelFactory For reading from/writing to data to other Stages.
    * @param metricMessageSender For sending metric with execution stats to Master.
    */
-  public TaskGroupExecutor(final ScheduledTaskGroup scheduledTaskGroup,
-                           final DAG<Task, RuntimeEdge<Task>> taskGroupDag,
-                           final TaskGroupStateManager taskGroupStateManager,
-                           final DataTransferFactory channelFactory,
-                           final MetricMessageSender metricMessageSender) {
-    this.taskGroupDag = taskGroupDag;
-    this.taskGroupId = scheduledTaskGroup.getTaskGroupId();
-    this.taskGroupIdx = scheduledTaskGroup.getTaskGroupIdx();
-    this.taskGroupStateManager = taskGroupStateManager;
-    this.stageIncomingEdges = scheduledTaskGroup.getTaskGroupIncomingEdges();
-    this.stageOutgoingEdges = scheduledTaskGroup.getTaskGroupOutgoingEdges();
-    this.logicalTaskIdToReadable = scheduledTaskGroup.getLogicalTaskIdToReadable();
+  public TaskExecutor(final ScheduledTask scheduledTask,
+                      final DAG<Task, RuntimeEdge<Task>> taskDag,
+                      final TaskStateManager taskStateManager,
+                      final DataTransferFactory channelFactory,
+                      final MetricMessageSender metricMessageSender) {
+    this.taskDag = taskDag;
+    this.taskId = scheduledTask.getTaskId();
+    this.taskIdx = scheduledTask.getTaskIdx();
+    this.taskStateManager = taskStateManager;
+    this.stageIncomingEdges = scheduledTask.getTaskIncomingEdges();
+    this.stageOutgoingEdges = scheduledTask.getTaskOutgoingEdges();
+    this.logicalTaskIdToReadable = scheduledTask.getLogicalTaskIdToReadable();
     this.channelFactory = channelFactory;
     this.metricCollector = new MetricCollector(metricMessageSender);
     this.logicalTaskIdPutOnHold = null;
@@ -119,25 +119,25 @@ public final class TaskGroupExecutor {
   }
 
   /**
-   * Initializes this TaskGroup before execution.
-   * 1) Create and connect reader/writers for both inter-TaskGroup data and intra-TaskGroup data.
+   * Initializes this Task before execution.
+   * 1) Create and connect reader/writers for both inter-Task data and intra-Task data.
    * 2) Prepares Transforms if needed.
    */
   private void initialize() {
     // Initialize data read of SourceVertex.
-    taskGroupDag.getTopologicalSort().stream()
+    taskDag.getTopologicalSort().stream()
         .filter(task -> task instanceof BoundedSourceTask)
         .forEach(boundedSourceTask -> ((BoundedSourceTask) boundedSourceTask).setReadable(
             logicalTaskIdToReadable.get(boundedSourceTask.getId())));
 
     // Initialize data handlers for each Task.
-    taskGroupDag.topologicalDo(task -> taskDataHandlers.add(new TaskDataHandler(task)));
+    taskDag.topologicalDo(task -> taskDataHandlers.add(new TaskDataHandler(task)));
 
     // Initialize data transfer.
     // Construct a pointer-based DAG of TaskDataHandlers that are used for data transfer.
     // 'Pointer-based' means that it isn't Map/List-based in getting the data structure or parent/children
     // to avoid element-wise extra overhead of calculating hash values(HashMap) or iterating Lists.
-    taskGroupDag.topologicalDo(task -> {
+    taskDag.topologicalDo(task -> {
       final Set<PhysicalStageEdge> inEdgesFromOtherStages = getInEdgesFromOtherStages(task);
       final Set<PhysicalStageEdge> outEdgesToOtherStages = getOutEdgesToOtherStages(task);
       final TaskDataHandler dataHandler = getTaskDataHandler(task);
@@ -145,14 +145,14 @@ public final class TaskGroupExecutor {
       // Set data handlers of children tasks.
       // This forms a pointer-based DAG of TaskDataHandlers.
       final List<TaskDataHandler> childrenDataHandlers = new ArrayList<>();
-      taskGroupDag.getChildren(task.getId()).forEach(child ->
+      taskDag.getChildren(task.getId()).forEach(child ->
           childrenDataHandlers.add(getTaskDataHandler(child)));
       dataHandler.setChildrenDataHandler(childrenDataHandlers);
 
       // Add InputReaders for inter-stage data transfer
       inEdgesFromOtherStages.forEach(physicalStageEdge -> {
         final InputReader inputReader = channelFactory.createReader(
-            taskGroupIdx, physicalStageEdge.getSrcVertex(), physicalStageEdge);
+            taskIdx, physicalStageEdge.getSrcVertex(), physicalStageEdge);
 
         // For InputReaders that have side input, collect them separately.
         if (inputReader.isSideInputReader()) {
@@ -167,7 +167,7 @@ public final class TaskGroupExecutor {
       // Add OutputWriters for inter-stage data transfer
       outEdgesToOtherStages.forEach(physicalStageEdge -> {
         final OutputWriter outputWriter = channelFactory.createWriter(
-            task, taskGroupIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
+            task, taskIdx, physicalStageEdge.getDstVertex(), physicalStageEdge);
         dataHandler.addOutputWriter(outputWriter);
       });
 
@@ -179,7 +179,7 @@ public final class TaskGroupExecutor {
     });
 
     // Prepare Transforms if needed.
-    taskGroupDag.topologicalDo(task -> {
+    taskDag.topologicalDo(task -> {
       if (task instanceof OperatorTask) {
         final Transform transform = ((OperatorTask) task).getTransform();
         final Map<Transform, Object> sideInputMap = new HashMap<>();
@@ -230,7 +230,7 @@ public final class TaskGroupExecutor {
    * @param task the Task to add input OutputCollectors to.
    */
   private void addInputFromThisStage(final Task task, final TaskDataHandler dataHandler) {
-    List<Task> parentTasks = taskGroupDag.getParents(task.getId());
+    List<Task> parentTasks = taskDag.getParents(task.getId());
     final String physicalTaskId = getPhysicalTaskId(task.getId());
 
     if (parentTasks != null) {
@@ -257,7 +257,7 @@ public final class TaskGroupExecutor {
     final OutputCollectorImpl outputCollector = new OutputCollectorImpl();
     final String physicalTaskId = getPhysicalTaskId(task.getId());
 
-    taskGroupDag.getOutgoingEdgesOf(task).forEach(outEdge -> {
+    taskDag.getOutgoingEdgesOf(task).forEach(outEdge -> {
       if (outEdge.isSideInput()) {
         outputCollector.setSideInputRuntimeEdge(outEdge);
         outputCollector.setAsSideInputFor(physicalTaskId);
@@ -279,7 +279,7 @@ public final class TaskGroupExecutor {
 
   /**
    * If the given task is MetricCollectionBarrierTask,
-   * set task as put on hold and use it to decide TaskGroup state when TaskGroup finishes.
+   * set task as put on hold and use it to decide Task state when Task finishes.
    *
    * @param task the task to check whether it has OutputWriters.
    * @return true if the task has OutputWriters.
@@ -290,7 +290,7 @@ public final class TaskGroupExecutor {
   }
 
   /**
-   * Finalize the output write of this TaskGroup.
+   * Finalize the output write of this Task.
    * As element-wise output write is done and the block is in memory,
    * flush the block into the designated data store and commit it.
    *
@@ -319,7 +319,7 @@ public final class TaskGroupExecutor {
    * Get input iterator from BoundedSource and bind it with id.
    */
   private void prepareInputFromSource() {
-    taskGroupDag.topologicalDo(task -> {
+    taskDag.topologicalDo(task -> {
       if (task instanceof BoundedSourceTask) {
         try {
           final String iteratorId = generateIteratorId();
@@ -328,14 +328,14 @@ public final class TaskGroupExecutor {
           srcIteratorIdToDataHandlersMap.putIfAbsent(iteratorId, new ArrayList<>());
           srcIteratorIdToDataHandlersMap.get(iteratorId).add(getTaskDataHandler(task));
         } catch (final BlockFetchException ex) {
-          taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
-              Optional.empty(), Optional.of(TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
+          taskStateManager.onTaskStateChanged(TaskState.State.FAILED_RECOVERABLE,
+              Optional.empty(), Optional.of(TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
           LOG.error("{} Execution Failed (Recoverable: input read failure)! Exception: {}",
-              taskGroupId, ex.toString());
+              taskId, ex.toString());
         } catch (final Exception e) {
-          taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
+          taskStateManager.onTaskStateChanged(TaskState.State.FAILED_UNRECOVERABLE,
               Optional.empty(), Optional.empty());
-          LOG.error("{} Execution Failed! Exception: {}", taskGroupId, e.toString());
+          LOG.error("{} Execution Failed! Exception: {}", taskId, e.toString());
           throw new RuntimeException(e);
         }
       }
@@ -374,12 +374,12 @@ public final class TaskGroupExecutor {
   }
 
   /**
-   * Check whether all tasks in this TaskGroup are finished.
+   * Check whether all tasks in this Task are finished.
    *
    * @return true if all tasks are finished.
    */
   private boolean finishedAllTasks() {
-    // Total number of Tasks in this TaskGroup
+    // Total number of Tasks
     int taskNum = taskDataHandlers.size();
     int finishedTaskNum = finishedTaskIds.size();
 
@@ -496,22 +496,22 @@ public final class TaskGroupExecutor {
   }
 
   /**
-   * Executes the task group.
+   * Executes the task.
    */
   public void execute() {
     final Map<String, Object> metric = new HashMap<>();
-    metricCollector.beginMeasurement(taskGroupId, metric);
+    metricCollector.beginMeasurement(taskId, metric);
     long boundedSrcReadStartTime = 0;
     long boundedSrcReadEndTime = 0;
     long inputReadStartTime = 0;
     long inputReadEndTime = 0;
     if (isExecutionRequested) {
-      throw new RuntimeException("TaskGroup {" + taskGroupId + "} execution called again!");
+      throw new RuntimeException("Task {" + taskId + "} execution called again!");
     } else {
       isExecutionRequested = true;
     }
-    taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.EXECUTING, Optional.empty(), Optional.empty());
-    LOG.info("{} Executing!", taskGroupId);
+    taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty());
+    LOG.info("{} Executing!", taskId);
 
     // Prepare input data from bounded source.
     boundedSrcReadStartTime = System.currentTimeMillis();
@@ -523,7 +523,7 @@ public final class TaskGroupExecutor {
     inputReadStartTime = System.currentTimeMillis();
     prepareInputFromOtherStages();
 
-    // Execute the TaskGroup DAG.
+    // Execute the Task DAG.
     try {
       srcIteratorIdToDataHandlersMap.forEach((srcIteratorId, dataHandlers) -> {
         Iterator iterator = idToSrcIteratorMap.get(srcIteratorId);
@@ -565,8 +565,8 @@ public final class TaskGroupExecutor {
       inputReadEndTime = System.currentTimeMillis();
       metric.put("InputReadTime(ms)", inputReadEndTime - inputReadStartTime);
 
-      // Process intra-TaskGroup data.
-      // Intra-TaskGroup data comes from outputCollectors of this TaskGroup's Tasks.
+      // Process intra-Task data.
+      // Intra-Task data comes from outputCollectors of this Task's Tasks.
       initializeOutputToChildrenDataHandlersMap();
       while (!finishedAllTasks()) {
         outputToChildrenDataHandlersMap.forEach((outputCollector, childrenDataHandlers) -> {
@@ -609,30 +609,30 @@ public final class TaskGroupExecutor {
         updateOutputToChildrenDataHandlersMap();
       }
     } catch (final BlockWriteException ex2) {
-      taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_RECOVERABLE,
-          Optional.empty(), Optional.of(TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
+      taskStateManager.onTaskStateChanged(TaskState.State.FAILED_RECOVERABLE,
+          Optional.empty(), Optional.of(TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
       LOG.error("{} Execution Failed (Recoverable: output write failure)! Exception: {}",
-          taskGroupId, ex2.toString());
+          taskId, ex2.toString());
     } catch (final Exception e) {
-      taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.FAILED_UNRECOVERABLE,
+      taskStateManager.onTaskStateChanged(TaskState.State.FAILED_UNRECOVERABLE,
           Optional.empty(), Optional.empty());
       LOG.error("{} Execution Failed! Exception: {}",
-          taskGroupId, e.toString());
+          taskId, e.toString());
       throw new RuntimeException(e);
     }
 
-    // Put TaskGroup-unit metrics.
+    // Put Task-unit metrics.
     final boolean available = serBlockSize >= 0;
     putReadBytesMetric(available, serBlockSize, encodedBlockSize, metric);
-    metricCollector.endMeasurement(taskGroupId, metric);
+    metricCollector.endMeasurement(taskId, metric);
     if (logicalTaskIdPutOnHold == null) {
-      taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.COMPLETE, Optional.empty(), Optional.empty());
+      taskStateManager.onTaskStateChanged(TaskState.State.COMPLETE, Optional.empty(), Optional.empty());
     } else {
-      taskGroupStateManager.onTaskGroupStateChanged(TaskGroupState.State.ON_HOLD,
+      taskStateManager.onTaskStateChanged(TaskState.State.ON_HOLD,
           Optional.of(logicalTaskIdPutOnHold),
           Optional.empty());
     }
-    LOG.info("{} Complete!", taskGroupId);
+    LOG.info("{} Complete!", taskId);
   }
 
   /**
@@ -695,7 +695,7 @@ public final class TaskGroupExecutor {
    * @return the physical task id.
    */
   private String getPhysicalTaskId(final String logicalTaskId) {
-    return RuntimeIdGenerator.generatePhysicalTaskId(taskGroupIdx, logicalTaskId);
+    return RuntimeIdGenerator.generatePhysicalTaskId(taskIdx, logicalTaskId);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
similarity index 59%
rename from runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupStateManager.java
rename to runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index 81ed432..77d5136 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskGroupStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -23,82 +23,82 @@ import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.common.plan.physical.Task;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
 
 import java.util.*;
 
+import edu.snu.nemo.runtime.common.state.TaskState;
 import org.apache.reef.annotations.audience.EvaluatorSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Manages the states related to a task group.
+ * Manages the states related to a task.
  * The methods of this class are synchronized.
  */
 @EvaluatorSide
-public final class TaskGroupStateManager {
-  private static final Logger LOG = LoggerFactory.getLogger(TaskGroupStateManager.class.getName());
+public final class TaskStateManager {
+  private static final Logger LOG = LoggerFactory.getLogger(TaskStateManager.class.getName());
 
-  private final String taskGroupId;
+  private final String taskId;
   private final int attemptIdx;
   private final String executorId;
   private final MetricCollector metricCollector;
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
 
-  public TaskGroupStateManager(final ScheduledTaskGroup scheduledTaskGroup,
-                               final DAG<Task, RuntimeEdge<Task>> taskGroupDag,
+  public TaskStateManager(final ScheduledTask scheduledTask,
+                               final DAG<Task, RuntimeEdge<Task>> taskDag,
                                final String executorId,
                                final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
                                final MetricMessageSender metricMessageSender) {
-    this.taskGroupId = scheduledTaskGroup.getTaskGroupId();
-    this.attemptIdx = scheduledTaskGroup.getAttemptIdx();
+    this.taskId = scheduledTask.getTaskId();
+    this.attemptIdx = scheduledTask.getAttemptIdx();
     this.executorId = executorId;
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     this.metricCollector = new MetricCollector(metricMessageSender);
   }
 
   /**
-   * Updates the state of the task group.
-   * @param newState of the task group.
+   * Updates the state of the task.
+   * @param newState of the task.
    * @param taskPutOnHold the logical ID of the tasks put on hold, empty otherwise.
    * @param cause only provided as non-empty upon recoverable failures.
    */
-  public synchronized void onTaskGroupStateChanged(final TaskGroupState.State newState,
-                                                   final Optional<String> taskPutOnHold,
-                                                   final Optional<TaskGroupState.RecoverableFailureCause> cause) {
+  public synchronized void onTaskStateChanged(final TaskState.State newState,
+                                              final Optional<String> taskPutOnHold,
+                                              final Optional<TaskState.RecoverableFailureCause> cause) {
     final Map<String, Object> metric = new HashMap<>();
 
     switch (newState) {
       case EXECUTING:
-        LOG.debug("Executing TaskGroup ID {}...", this.taskGroupId);
+        LOG.debug("Executing Task ID {}...", this.taskId);
         metric.put("ContainerId", executorId);
         metric.put("ScheduleAttempt", attemptIdx);
         metric.put("FromState", newState);
-        metricCollector.beginMeasurement(taskGroupId, metric);
+        metricCollector.beginMeasurement(taskId, metric);
         break;
       case COMPLETE:
-        LOG.debug("TaskGroup ID {} complete!", this.taskGroupId);
+        LOG.debug("Task ID {} complete!", this.taskId);
         metric.put("ToState", newState);
-        metricCollector.endMeasurement(taskGroupId, metric);
-        notifyTaskGroupStateToMaster(newState, Optional.empty(), cause);
+        metricCollector.endMeasurement(taskId, metric);
+        notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
       case FAILED_RECOVERABLE:
-        LOG.debug("TaskGroup ID {} failed (recoverable).", this.taskGroupId);
+        LOG.debug("Task ID {} failed (recoverable).", this.taskId);
         metric.put("ToState", newState);
-        metricCollector.endMeasurement(taskGroupId, metric);
-        notifyTaskGroupStateToMaster(newState, Optional.empty(), cause);
+        metricCollector.endMeasurement(taskId, metric);
+        notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
       case FAILED_UNRECOVERABLE:
-        LOG.debug("TaskGroup ID {} failed (unrecoverable).", this.taskGroupId);
+        LOG.debug("Task ID {} failed (unrecoverable).", this.taskId);
         metric.put("ToState", newState);
-        metricCollector.endMeasurement(taskGroupId, metric);
-        notifyTaskGroupStateToMaster(newState, Optional.empty(), cause);
+        metricCollector.endMeasurement(taskId, metric);
+        notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
       case ON_HOLD:
-        LOG.debug("TaskGroup ID {} put on hold.", this.taskGroupId);
-        notifyTaskGroupStateToMaster(newState, taskPutOnHold, cause);
+        LOG.debug("Task ID {} put on hold.", this.taskId);
+        notifyTaskStateToMaster(newState, taskPutOnHold, cause);
         break;
       default:
         throw new IllegalStateException("Illegal state at this point");
@@ -106,18 +106,18 @@ public final class TaskGroupStateManager {
   }
 
   /**
-   * Notifies the change in task group state to master.
-   * @param newState of the task group.
+   * Notifies the change in task state to master.
+   * @param newState of the task.
    * @param taskPutOnHold the logical ID of the tasks put on hold, empty otherwise.
    * @param cause only provided as non-empty upon recoverable failures.
    */
-  private void notifyTaskGroupStateToMaster(final TaskGroupState.State newState,
+  private void notifyTaskStateToMaster(final TaskState.State newState,
                                             final Optional<String> taskPutOnHold,
-                                            final Optional<TaskGroupState.RecoverableFailureCause> cause) {
-    final ControlMessage.TaskGroupStateChangedMsg.Builder msgBuilder =
-        ControlMessage.TaskGroupStateChangedMsg.newBuilder()
+                                            final Optional<TaskState.RecoverableFailureCause> cause) {
+    final ControlMessage.TaskStateChangedMsg.Builder msgBuilder =
+        ControlMessage.TaskStateChangedMsg.newBuilder()
           .setExecutorId(executorId)
-          .setTaskGroupId(taskGroupId)
+          .setTaskId(taskId)
           .setAttemptIdx(attemptIdx)
           .setState(convertState(newState));
     if (taskPutOnHold.isPresent()) {
@@ -127,37 +127,37 @@ public final class TaskGroupStateManager {
       msgBuilder.setFailureCause(convertFailureCause(cause.get()));
     }
 
-    // Send taskGroupStateChangedMsg to master!
+    // Send taskStateChangedMsg to master!
     persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
         ControlMessage.Message.newBuilder()
             .setId(RuntimeIdGenerator.generateMessageId())
             .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-            .setType(ControlMessage.MessageType.TaskGroupStateChanged)
-            .setTaskGroupStateChangedMsg(msgBuilder.build())
+            .setType(ControlMessage.MessageType.TaskStateChanged)
+            .setTaskStateChangedMsg(msgBuilder.build())
             .build());
   }
 
-  private ControlMessage.TaskGroupStateFromExecutor convertState(final TaskGroupState.State state) {
+  private ControlMessage.TaskStateFromExecutor convertState(final TaskState.State state) {
     switch (state) {
     case READY:
-      return ControlMessage.TaskGroupStateFromExecutor.READY;
+      return ControlMessage.TaskStateFromExecutor.READY;
     case EXECUTING:
-      return ControlMessage.TaskGroupStateFromExecutor.EXECUTING;
+      return ControlMessage.TaskStateFromExecutor.EXECUTING;
     case COMPLETE:
-      return ControlMessage.TaskGroupStateFromExecutor.COMPLETE;
+      return ControlMessage.TaskStateFromExecutor.COMPLETE;
     case FAILED_RECOVERABLE:
-      return ControlMessage.TaskGroupStateFromExecutor.FAILED_RECOVERABLE;
+      return ControlMessage.TaskStateFromExecutor.FAILED_RECOVERABLE;
     case FAILED_UNRECOVERABLE:
-      return ControlMessage.TaskGroupStateFromExecutor.FAILED_UNRECOVERABLE;
+      return ControlMessage.TaskStateFromExecutor.FAILED_UNRECOVERABLE;
     case ON_HOLD:
-      return ControlMessage.TaskGroupStateFromExecutor.ON_HOLD;
+      return ControlMessage.TaskStateFromExecutor.ON_HOLD;
     default:
-      throw new UnknownExecutionStateException(new Exception("This TaskGroupState is unknown: " + state));
+      throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
     }
   }
 
   private ControlMessage.RecoverableFailureCause convertFailureCause(
-    final TaskGroupState.RecoverableFailureCause cause) {
+    final TaskState.RecoverableFailureCause cause) {
     switch (cause) {
     case INPUT_READ_FAILURE:
       return ControlMessage.RecoverableFailureCause.InputReadFailure;
@@ -170,6 +170,6 @@ public final class TaskGroupStateManager {
   }
 
   // Tentative
-  public void getCurrentTaskGroupExecutionState() {
+  public void getCurrentTaskExecutionState() {
   }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 0d7e51b..3ca2402 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -115,7 +115,7 @@ public final class InputReader extends DataTransfer {
   private List<CompletableFuture<DataUtil.IteratorWithNumBytes>> readDataInRange() {
     assert (runtimeEdge instanceof PhysicalStageEdge);
     final KeyRange hashRangeToRead =
-        ((PhysicalStageEdge) runtimeEdge).getTaskGroupIdxToKeyRange().get(dstTaskIndex);
+        ((PhysicalStageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
     if (hashRangeToRead == null) {
       throw new BlockFetchException(
           new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index 62cbc22..0588769 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -80,7 +80,7 @@ public final class OutputCollectorImpl<O> implements OutputCollector<O> {
   }
 
   /**
-   * Mark this edge as side input so that TaskGroupExecutor can retrieve
+   * Mark this edge as side input so that TaskExecutor can retrieve
    * source transform using it.
    *
    * @param edge the RuntimeEdge to mark as side input.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
index 523d837..56f5e5b 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/TaskDataHandler.java
@@ -23,7 +23,7 @@ import java.util.List;
 /**
  * Per-Task data handler.
  * This is a wrapper class that handles data transfer of a Task.
- * As TaskGroup input is processed element-wise, Task output element percolates down
+ * As Task input is processed element-wise, Task output element percolates down
  * through the DAG of children TaskDataHandlers.
  */
 public final class TaskDataHandler {
@@ -69,7 +69,7 @@ public final class TaskDataHandler {
   }
 
   /**
-   * Get side input from other TaskGroup.
+   * Get side input from other Task.
    *
    * @return InputReader that has side input.
    */
@@ -78,8 +78,8 @@ public final class TaskDataHandler {
   }
 
   /**
-   * Get intra-TaskGroup side input from parent tasks.
-   * Just like normal intra-TaskGroup inputs, intra-TaskGroup side inputs are
+   * Get intra-Task side input from parent tasks.
+   * Just like normal intra-Task inputs, intra-Task side inputs are
    * collected in parent tasks' OutputCollectors.
    *
    * @return OutputCollectors of all parent tasks which are marked as having side input.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
index 2cd2c47..d8e0b1e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/BlockManagerMaster.java
@@ -52,7 +52,7 @@ import static edu.snu.nemo.runtime.common.state.BlockState.State.SCHEDULED;
 public final class BlockManagerMaster {
   private static final Logger LOG = LoggerFactory.getLogger(BlockManagerMaster.class.getName());
   private final Map<String, BlockMetadata> blockIdToMetadata;
-  private final Map<String, Set<String>> producerTaskGroupIdToBlockIds;
+  private final Map<String, Set<String>> producerTaskIdToBlockIds;
   // A lock that can be acquired exclusively or not.
   // Because the BlockMetadata itself is sufficiently synchronized,
   // operation that runs in a single block can just acquire a (sharable) read lock.
@@ -70,7 +70,7 @@ public final class BlockManagerMaster {
     masterMessageEnvironment.setupListener(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID,
         new PartitionManagerMasterControlMessageReceiver());
     this.blockIdToMetadata = new HashMap<>();
-    this.producerTaskGroupIdToBlockIds = new HashMap<>();
+    this.producerTaskIdToBlockIds = new HashMap<>();
     this.lock = new ReentrantReadWriteLock();
   }
 
@@ -78,17 +78,17 @@ public final class BlockManagerMaster {
    * Initializes the states of a block which will be produced by producer task(s).
    *
    * @param blockId             the id of the block to initialize.
-   * @param producerTaskGroupId the id of the producer task group.
+   * @param producerTaskId the id of the producer task.
    */
   @VisibleForTesting
   public void initializeState(final String blockId,
-                              final String producerTaskGroupId) {
+                              final String producerTaskId) {
     final Lock writeLock = lock.writeLock();
     writeLock.lock();
     try {
       blockIdToMetadata.put(blockId, new BlockMetadata(blockId));
-      producerTaskGroupIdToBlockIds.putIfAbsent(producerTaskGroupId, new HashSet<>());
-      producerTaskGroupIdToBlockIds.get(producerTaskGroupId).add(blockId);
+      producerTaskIdToBlockIds.putIfAbsent(producerTaskId, new HashSet<>());
+      producerTaskIdToBlockIds.get(producerTaskId).add(blockId);
     } finally {
       writeLock.unlock();
     }
@@ -98,10 +98,10 @@ public final class BlockManagerMaster {
    * Manages the block information when a executor is removed.
    *
    * @param executorId the id of removed executor.
-   * @return the set of task groups have to be recomputed.
+   * @return the set of tasks have to be recomputed.
    */
   public Set<String> removeWorker(final String executorId) {
-    final Set<String> taskGroupsToRecompute = new HashSet<>();
+    final Set<String> tasksToRecompute = new HashSet<>();
     LOG.warn("Worker {} is removed.", new Object[]{executorId});
 
     final Lock writeLock = lock.writeLock();
@@ -110,12 +110,12 @@ public final class BlockManagerMaster {
       // Set committed block states to lost
       getCommittedBlocksByWorker(executorId).forEach(blockId -> {
         onBlockStateChanged(blockId, BlockState.State.LOST, executorId);
-        // producerTaskGroupForPartition should always be non-empty.
-        final Set<String> producerTaskGroupForPartition = getProducerTaskGroupIds(blockId);
-        producerTaskGroupForPartition.forEach(taskGroupsToRecompute::add);
+        // producerTaskForPartition should always be non-empty.
+        final Set<String> producerTaskForPartition = getProducerTaskIds(blockId);
+        producerTaskForPartition.forEach(tasksToRecompute::add);
       });
 
-      return taskGroupsToRecompute;
+      return tasksToRecompute;
     } finally {
       writeLock.unlock();
     }
@@ -154,66 +154,66 @@ public final class BlockManagerMaster {
   }
 
   /**
-   * Gets the ids of the task groups which already produced or will produce data for a specific block.
+   * Gets the ids of the tasks which already produced or will produce data for a specific block.
    *
    * @param blockId the id of the block.
-   * @return the ids of the producer task groups.
+   * @return the ids of the producer tasks.
    */
   @VisibleForTesting
-  public Set<String> getProducerTaskGroupIds(final String blockId) {
+  public Set<String> getProducerTaskIds(final String blockId) {
     final Lock readLock = lock.readLock();
     readLock.lock();
     try {
-      final Set<String> producerTaskGroupIds = new HashSet<>();
-      for (Map.Entry<String, Set<String>> entry : producerTaskGroupIdToBlockIds.entrySet()) {
+      final Set<String> producerTaskIds = new HashSet<>();
+      for (Map.Entry<String, Set<String>> entry : producerTaskIdToBlockIds.entrySet()) {
         if (entry.getValue().contains(blockId)) {
-          producerTaskGroupIds.add(entry.getKey());
+          producerTaskIds.add(entry.getKey());
         }
       }
 
-      return producerTaskGroupIds;
+      return producerTaskIds;
     } finally {
       readLock.unlock();
     }
   }
 
   /**
-   * To be called when a potential producer task group is scheduled.
-   * To be precise, it is called when the task group is enqueued to
-   * {@link edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupCollection}.
+   * To be called when a potential producer task is scheduled.
+   * To be precise, it is called when the task is enqueued to
+   * {@link edu.snu.nemo.runtime.master.scheduler.PendingTaskCollection}.
    *
-   * @param scheduledTaskGroupId the ID of the scheduled task group.
+   * @param scheduledTaskId the ID of the scheduled task.
    */
-  public void onProducerTaskGroupScheduled(final String scheduledTaskGroupId) {
+  public void onProducerTaskScheduled(final String scheduledTaskId) {
     final Lock writeLock = lock.writeLock();
     writeLock.lock();
     try {
-      if (producerTaskGroupIdToBlockIds.containsKey(scheduledTaskGroupId)) {
-        producerTaskGroupIdToBlockIds.get(scheduledTaskGroupId).forEach(blockId -> {
+      if (producerTaskIdToBlockIds.containsKey(scheduledTaskId)) {
+        producerTaskIdToBlockIds.get(scheduledTaskId).forEach(blockId -> {
           if (!blockIdToMetadata.get(blockId).getBlockState()
               .getStateMachine().getCurrentState().equals(SCHEDULED)) {
             onBlockStateChanged(blockId, SCHEDULED, null);
           }
         });
-      } // else this task group does not produce any block
+      } // else this task does not produce any block
     } finally {
       writeLock.unlock();
     }
   }
 
   /**
-   * To be called when a potential producer task group fails.
-   * Only the TaskGroups that have not yet completed (i.e. blocks not yet committed) will call this method.
+   * To be called when a potential producer task fails.
+   * Only the Tasks that have not yet completed (i.e. blocks not yet committed) will call this method.
    *
-   * @param failedTaskGroupId the ID of the task group that failed.
+   * @param failedTaskId the ID of the task that failed.
    */
-  public void onProducerTaskGroupFailed(final String failedTaskGroupId) {
+  public void onProducerTaskFailed(final String failedTaskId) {
     final Lock writeLock = lock.writeLock();
     writeLock.lock();
     try {
-      if (producerTaskGroupIdToBlockIds.containsKey(failedTaskGroupId)) {
-        LOG.info("ProducerTaskGroup {} failed for a list of blocks:", failedTaskGroupId);
-        producerTaskGroupIdToBlockIds.get(failedTaskGroupId).forEach(blockId -> {
+      if (producerTaskIdToBlockIds.containsKey(failedTaskId)) {
+        LOG.info("ProducerTask {} failed for a list of blocks:", failedTaskId);
+        producerTaskIdToBlockIds.get(failedTaskId).forEach(blockId -> {
           final BlockState.State state = (BlockState.State)
               blockIdToMetadata.get(blockId).getBlockState().getStateMachine().getCurrentState();
           if (state == BlockState.State.COMMITTED) {
@@ -224,7 +224,7 @@ public final class BlockManagerMaster {
             onBlockStateChanged(blockId, BlockState.State.LOST_BEFORE_COMMIT, null);
           }
         });
-      } // else this task group does not produce any block
+      } // else this task does not produce any block
     } finally {
       writeLock.unlock();
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 6e0877e..61f7d6e 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -26,7 +26,6 @@ import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.StageState;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
 
 import java.io.File;
 import java.io.IOException;
@@ -37,6 +36,7 @@ import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import edu.snu.nemo.runtime.common.state.TaskState;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,7 +63,7 @@ public final class JobStateManager {
    */
   private final JobState jobState;
   private final Map<String, StageState> idToStageStates;
-  private final Map<String, TaskGroupState> idToTaskGroupStates;
+  private final Map<String, TaskState> idToTaskStates;
 
   /**
    * Keeps track of the number of schedule attempts for each stage.
@@ -77,11 +77,11 @@ public final class JobStateManager {
 
   /**
    * Used to track stage completion status.
-   * All task group ids are added to the set when the a stage begins executing.
-   * Each task group id is removed upon completion,
+   * All task ids are added to the set when the a stage begins executing.
+   * Each task id is removed upon completion,
    * therefore indicating the stage's completion when this set becomes empty.
    */
-  private final Map<String, Set<String>> stageIdToRemainingTaskGroupSet;
+  private final Map<String, Set<String>> stageIdToRemainingTaskSet;
 
   /**
    * Used to track job completion status.
@@ -110,9 +110,9 @@ public final class JobStateManager {
     this.maxScheduleAttempt = maxScheduleAttempt;
     this.jobState = new JobState();
     this.idToStageStates = new HashMap<>();
-    this.idToTaskGroupStates = new HashMap<>();
+    this.idToTaskStates = new HashMap<>();
     this.scheduleAttemptIdxByStage = new HashMap<>();
-    this.stageIdToRemainingTaskGroupSet = new HashMap<>();
+    this.stageIdToRemainingTaskSet = new HashMap<>();
     this.currentJobStageIds = new HashSet<>();
     this.finishLock = new ReentrantLock();
     this.jobFinishedCondition = finishLock.newCondition();
@@ -122,7 +122,7 @@ public final class JobStateManager {
   }
 
   /**
-   * Initializes the states for the job/stages/taskgroups/tasks for this job.
+   * Initializes the states for the job/stages/tasks for this job.
    */
   private void initializeComputationStates() {
     onJobStateChanged(JobState.State.EXECUTING);
@@ -131,8 +131,8 @@ public final class JobStateManager {
     physicalPlan.getStageDAG().topologicalDo(physicalStage -> {
       currentJobStageIds.add(physicalStage.getId());
       idToStageStates.put(physicalStage.getId(), new StageState());
-      physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-        idToTaskGroupStates.put(taskGroupId, new TaskGroupState());
+      physicalStage.getTaskIds().forEach(taskId -> {
+        idToTaskStates.put(taskId, new TaskState());
       });
     });
   }
@@ -140,27 +140,27 @@ public final class JobStateManager {
   private void initializePartitionStates(final BlockManagerMaster blockManagerMaster) {
     final DAG<PhysicalStage, PhysicalStageEdge> stageDAG = physicalPlan.getStageDAG();
     stageDAG.topologicalDo(physicalStage -> {
-      final List<String> taskGroupIdsForStage = physicalStage.getTaskGroupIds();
+      final List<String> taskIdsForStage = physicalStage.getTaskIds();
       final List<PhysicalStageEdge> stageOutgoingEdges = stageDAG.getOutgoingEdgesOf(physicalStage);
 
       // Initialize states for blocks of inter-stage edges
       stageOutgoingEdges.forEach(physicalStageEdge -> {
-        final int srcParallelism = taskGroupIdsForStage.size();
+        final int srcParallelism = taskIdsForStage.size();
         IntStream.range(0, srcParallelism).forEach(srcTaskIdx -> {
           final String blockId = RuntimeIdGenerator.generateBlockId(physicalStageEdge.getId(), srcTaskIdx);
-          blockManagerMaster.initializeState(blockId, taskGroupIdsForStage.get(srcTaskIdx));
+          blockManagerMaster.initializeState(blockId, taskIdsForStage.get(srcTaskIdx));
         });
       });
 
       // Initialize states for blocks of stage internal edges
-      taskGroupIdsForStage.forEach(taskGroupId -> {
-        final DAG<Task, RuntimeEdge<Task>> taskGroupInternalDag = physicalStage.getTaskGroupDag();
-        taskGroupInternalDag.getVertices().forEach(task -> {
-          final List<RuntimeEdge<Task>> internalOutgoingEdges = taskGroupInternalDag.getOutgoingEdgesOf(task);
+      taskIdsForStage.forEach(taskId -> {
+        final DAG<Task, RuntimeEdge<Task>> taskInternalDag = physicalStage.getTaskDag();
+        taskInternalDag.getVertices().forEach(task -> {
+          final List<RuntimeEdge<Task>> internalOutgoingEdges = taskInternalDag.getOutgoingEdgesOf(task);
           internalOutgoingEdges.forEach(taskRuntimeEdge -> {
-            final int srcTaskIdx = RuntimeIdGenerator.getIndexFromTaskGroupId(taskGroupId);
+            final int srcTaskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
             final String blockId = RuntimeIdGenerator.generateBlockId(taskRuntimeEdge.getId(), srcTaskIdx);
-            blockManagerMaster.initializeState(blockId, taskGroupId);
+            blockManagerMaster.initializeState(blockId, taskId);
           });
         });
       });
@@ -229,14 +229,14 @@ public final class JobStateManager {
       beginMeasurement(stageId, metric);
 
       // if there exists a mapping, this state change is from a failed_recoverable stage,
-      // and there may be task groups that do not need to be re-executed.
-      if (!stageIdToRemainingTaskGroupSet.containsKey(stageId)) {
+      // and there may be tasks that do not need to be re-executed.
+      if (!stageIdToRemainingTaskSet.containsKey(stageId)) {
         for (final PhysicalStage stage : physicalPlan.getStageDAG().getVertices()) {
           if (stage.getId().equals(stageId)) {
-            Set<String> remainingTaskGroupIds = new HashSet<>();
-            remainingTaskGroupIds.addAll(
-                stage.getTaskGroupIds().stream().collect(Collectors.toSet()));
-            stageIdToRemainingTaskGroupSet.put(stageId, remainingTaskGroupIds);
+            Set<String> remainingTaskIds = new HashSet<>();
+            remainingTaskIds.addAll(
+                stage.getTaskIds().stream().collect(Collectors.toSet()));
+            stageIdToRemainingTaskSet.put(stageId, remainingTaskIds);
             break;
           }
         }
@@ -260,39 +260,37 @@ public final class JobStateManager {
   }
 
   /**
-   * Updates the state of a task group.
-   * Task group state changes can occur both in master and executor.
+   * Updates the state of a task.
+   * Task state changes can occur both in master and executor.
    * State changes that occur in master are
    * initiated in {@link edu.snu.nemo.runtime.master.scheduler.BatchSingleJobScheduler}.
    * State changes that occur in executors are sent to master as a control message,
    * and the call to this method is initiated in {@link edu.snu.nemo.runtime.master.scheduler.BatchSingleJobScheduler}
    * when the message/event is received.
-   * A task group completion implies completion of all its tasks.
    *
-   * @param taskGroupId  the ID of the task group.
-   * @param newState     the new state of the task group.
+   * @param taskId  the ID of the task.
+   * @param newState     the new state of the task.
    */
-  public synchronized void onTaskGroupStateChanged(final String taskGroupId,
-                                                   final TaskGroupState.State newState) {
-    final StateMachine taskGroupState = idToTaskGroupStates.get(taskGroupId).getStateMachine();
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
-    LOG.debug("Task Group State Transition: id {}, from {} to {}",
-        new Object[]{taskGroupId, taskGroupState.getCurrentState(), newState});
+  public synchronized void onTaskStateChanged(final String taskId, final TaskState.State newState) {
+    final StateMachine taskState = idToTaskStates.get(taskId).getStateMachine();
+    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
+    LOG.debug("Task State Transition: id {}, from {} to {}",
+        new Object[]{taskId, taskState.getCurrentState(), newState});
     final Map<String, Object> metric = new HashMap<>();
 
     switch (newState) {
     case ON_HOLD:
     case COMPLETE:
-      taskGroupState.setState(newState);
+      taskState.setState(newState);
       metric.put("ToState", newState);
-      endMeasurement(taskGroupId, metric);
+      endMeasurement(taskId, metric);
 
-      if (stageIdToRemainingTaskGroupSet.containsKey(stageId)) {
-        final Set<String> remainingTaskGroups = stageIdToRemainingTaskGroupSet.get(stageId);
-        LOG.info("{}: {} TaskGroup(s) to go", stageId, remainingTaskGroups.size());
-        remainingTaskGroups.remove(taskGroupId);
+      if (stageIdToRemainingTaskSet.containsKey(stageId)) {
+        final Set<String> remainingTasks = stageIdToRemainingTaskSet.get(stageId);
+        LOG.info("{}: {} Task(s) to go", stageId, remainingTasks.size());
+        remainingTasks.remove(taskId);
 
-        if (remainingTaskGroups.isEmpty()) {
+        if (remainingTasks.isEmpty()) {
           onStageStateChanged(stageId, StageState.State.COMPLETE);
         }
       } else {
@@ -301,50 +299,50 @@ public final class JobStateManager {
       }
       break;
     case EXECUTING:
-      taskGroupState.setState(newState);
+      taskState.setState(newState);
       metric.put("FromState", newState);
-      beginMeasurement(taskGroupId, metric);
+      beginMeasurement(taskId, metric);
       break;
     case FAILED_RECOVERABLE:
-      // Multiple calls to set a task group's state to failed_recoverable can occur when
-      // a task group is made failed_recoverable early by another task group's failure detection in the same stage
-      // and the task group finds itself failed_recoverable later, propagating the state change event only then.
-      if (taskGroupState.getCurrentState() != TaskGroupState.State.FAILED_RECOVERABLE) {
-        taskGroupState.setState(newState);
+      // Multiple calls to set a task's state to failed_recoverable can occur when
+      // a task is made failed_recoverable early by another task's failure detection in the same stage
+      // and the task finds itself failed_recoverable later, propagating the state change event only then.
+      if (taskState.getCurrentState() != TaskState.State.FAILED_RECOVERABLE) {
+        taskState.setState(newState);
         metric.put("ToState", newState);
-        endMeasurement(taskGroupId, metric);
+        endMeasurement(taskId, metric);
 
-        // Mark this stage as failed_recoverable as long as it contains at least one failed_recoverable task group
+        // Mark this stage as failed_recoverable as long as it contains at least one failed_recoverable task
         if (idToStageStates.get(stageId).getStateMachine().getCurrentState() != StageState.State.FAILED_RECOVERABLE) {
           onStageStateChanged(stageId, StageState.State.FAILED_RECOVERABLE);
         }
 
-        if (stageIdToRemainingTaskGroupSet.containsKey(stageId)) {
-          stageIdToRemainingTaskGroupSet.get(stageId).add(taskGroupId);
+        if (stageIdToRemainingTaskSet.containsKey(stageId)) {
+          stageIdToRemainingTaskSet.get(stageId).add(taskId);
         } else {
           throw new IllegalStateTransitionException(
               new Throwable("The stage has not yet been submitted for execution"));
         }
       } else {
         LOG.info("{} state is already FAILED_RECOVERABLE. Skipping this event.",
-            taskGroupId);
+            taskId);
       }
       break;
     case READY:
-      taskGroupState.setState(newState);
+      taskState.setState(newState);
       break;
     case FAILED_UNRECOVERABLE:
-      taskGroupState.setState(newState);
+      taskState.setState(newState);
       metric.put("ToState", newState);
-      endMeasurement(taskGroupId, metric);
+      endMeasurement(taskId, metric);
       break;
     default:
-      throw new UnknownExecutionStateException(new Throwable("This task group state is unknown"));
+      throw new UnknownExecutionStateException(new Throwable("This task state is unknown"));
     }
   }
 
   public synchronized boolean checkStageCompletion(final String stageId) {
-    return stageIdToRemainingTaskGroupSet.get(stageId).isEmpty();
+    return stageIdToRemainingTaskSet.get(stageId).isEmpty();
   }
 
   public synchronized boolean checkJobTermination() {
@@ -420,12 +418,12 @@ public final class JobStateManager {
     return idToStageStates;
   }
 
-  public synchronized TaskGroupState getTaskGroupState(final String taskGroupId) {
-    return idToTaskGroupStates.get(taskGroupId);
+  public synchronized TaskState getTaskState(final String taskId) {
+    return idToTaskStates.get(taskId);
   }
 
-  public synchronized Map<String, TaskGroupState> getIdToTaskGroupStates() {
-    return idToTaskGroupStates;
+  public synchronized Map<String, TaskState> getIdToTaskStates() {
+    return idToTaskStates;
   }
 
   /**
@@ -449,7 +447,7 @@ public final class JobStateManager {
   private void endMeasurement(final String compUnitId, final Map<String, Object> finalMetric) {
     final MetricDataBuilder metricDataBuilder = metricDataBuilderMap.get(compUnitId);
 
-    // may be null when a TaskGroup fails without entering the executing state (due to an input read failure)
+    // may be null when a Task fails without entering the executing state (due to an input read failure)
     if (metricDataBuilder != null) {
       finalMetric.put("ContainerId", "Master");
       metricDataBuilder.endMeasurement(finalMetric);
@@ -501,17 +499,17 @@ public final class JobStateManager {
       final StageState stageState = idToStageStates.get(stage.getId());
       sb.append("{\"id\": \"").append(stage.getId()).append("\", ");
       sb.append("\"state\": \"").append(stageState.toString()).append("\", ");
-      sb.append("\"taskGroups\": [");
+      sb.append("\"tasks\": [");
 
-      boolean isFirstTaskGroup = true;
-      for (final String taskGroupId : stage.getTaskGroupIds()) {
-        if (!isFirstTaskGroup) {
+      boolean isFirstTask = true;
+      for (final String taskId : stage.getTaskIds()) {
+        if (!isFirstTask) {
           sb.append(", ");
         }
-        isFirstTaskGroup = false;
-        final TaskGroupState taskGroupState = idToTaskGroupStates.get(taskGroupId);
-        sb.append("{\"id\": \"").append(taskGroupId).append("\", ");
-        sb.append("\"state\": \"").append(taskGroupState.toString()).append("\"}");
+        isFirstTask = false;
+        final TaskState taskState = idToTaskStates.get(taskId);
+        sb.append("{\"id\": \"").append(taskId).append("\", ");
+        sb.append("\"state\": \"").append(taskState.toString()).append("\"}");
       }
       sb.append("]}");
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java
index b768edd..a098202 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java
@@ -27,7 +27,7 @@ public interface MetricMessageHandler {
 
   /**
    * Handle the received metric message.
-   * @param metricKey a given key for the metric (ex. TaskGroup ID)
+   * @param metricKey a given key for the metric (ex. Task ID)
    * @param metricValue the metric formatted as a string (ex. JSON).
    */
   void onMetricMessageReceived(final String metricKey, final String metricValue);
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 1c4e458..e282509 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -25,7 +25,7 @@ import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
@@ -48,8 +48,8 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static edu.snu.nemo.runtime.common.state.TaskGroupState.State.COMPLETE;
-import static edu.snu.nemo.runtime.common.state.TaskGroupState.State.ON_HOLD;
+import static edu.snu.nemo.runtime.common.state.TaskState.State.COMPLETE;
+import static edu.snu.nemo.runtime.common.state.TaskState.State.ON_HOLD;
 
 /**
  * (WARNING) Use runtimeMasterThread for all public methods to avoid race conditions.
@@ -250,47 +250,47 @@ public final class RuntimeMaster {
     @Override
     public void onMessageWithContext(final ControlMessage.Message message, final MessageContext messageContext) {
       switch (message.getType()) {
-      default:
-        throw new IllegalMessageException(
-            new Exception("This message should not be requested to Master :" + message.getType()));
+        default:
+          throw new IllegalMessageException(
+              new Exception("This message should not be requested to Master :" + message.getType()));
       }
     }
   }
 
   private void handleControlMessage(final ControlMessage.Message message) {
     switch (message.getType()) {
-    case TaskGroupStateChanged:
-      final ControlMessage.TaskGroupStateChangedMsg taskGroupStateChangedMsg
-          = message.getTaskGroupStateChangedMsg();
-
-      scheduler.onTaskGroupStateChanged(taskGroupStateChangedMsg.getExecutorId(),
-          taskGroupStateChangedMsg.getTaskGroupId(),
-          convertTaskGroupState(taskGroupStateChangedMsg.getState()),
-          taskGroupStateChangedMsg.getAttemptIdx(),
-          taskGroupStateChangedMsg.getTaskPutOnHoldId(),
-          convertFailureCause(taskGroupStateChangedMsg.getFailureCause()));
-      break;
-    case ExecutorFailed:
-      // Executor failed due to user code.
-      final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
-      final String failedExecutorId = executorFailedMsg.getExecutorId();
-      final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
-      LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
-      throw new RuntimeException(exception);
-    case DataSizeMetric:
-      final ControlMessage.DataSizeMetricMsg dataSizeMetricMsg = message.getDataSizeMetricMsg();
-      // TODO #511: Refactor metric aggregation for (general) run-rime optimization.
-      accumulateBarrierMetric(dataSizeMetricMsg.getPartitionSizeList(),
-          dataSizeMetricMsg.getSrcIRVertexId(), dataSizeMetricMsg.getBlockId());
-      break;
-    case MetricMessageReceived:
-      final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
-      metricList.forEach(metric ->
-          metricMessageHandler.onMetricMessageReceived(metric.getMetricKey(), metric.getMetricValue()));
-      break;
-    default:
-      throw new IllegalMessageException(
-          new Exception("This message should not be received by Master :" + message.getType()));
+      case TaskStateChanged:
+        final ControlMessage.TaskStateChangedMsg taskStateChangedMsg
+            = message.getTaskStateChangedMsg();
+
+        scheduler.onTaskStateChanged(taskStateChangedMsg.getExecutorId(),
+            taskStateChangedMsg.getTaskId(),
+            convertTaskState(taskStateChangedMsg.getState()),
+            taskStateChangedMsg.getAttemptIdx(),
+            taskStateChangedMsg.getTaskPutOnHoldId(),
+            convertFailureCause(taskStateChangedMsg.getFailureCause()));
+        break;
+      case ExecutorFailed:
+        // Executor failed due to user code.
+        final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
+        final String failedExecutorId = executorFailedMsg.getExecutorId();
+        final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
+        LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
+        throw new RuntimeException(exception);
+      case DataSizeMetric:
+        final ControlMessage.DataSizeMetricMsg dataSizeMetricMsg = message.getDataSizeMetricMsg();
+        // TODO #511: Refactor metric aggregation for (general) run-rime optimization.
+        accumulateBarrierMetric(dataSizeMetricMsg.getPartitionSizeList(),
+            dataSizeMetricMsg.getSrcIRVertexId(), dataSizeMetricMsg.getBlockId());
+        break;
+      case MetricMessageReceived:
+        final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
+        metricList.forEach(metric ->
+            metricMessageHandler.onMetricMessageReceived(metric.getMetricKey(), metric.getMetricValue()));
+        break;
+      default:
+        throw new IllegalMessageException(
+            new Exception("This message should not be received by Master :" + message.getType()));
     }
   }
 
@@ -326,32 +326,32 @@ public final class RuntimeMaster {
   }
 
   // TODO #164: Cleanup Protobuf Usage
-  private static TaskGroupState.State convertTaskGroupState(final ControlMessage.TaskGroupStateFromExecutor state) {
+  private static TaskState.State convertTaskState(final ControlMessage.TaskStateFromExecutor state) {
     switch (state) {
       case READY:
-        return TaskGroupState.State.READY;
+        return TaskState.State.READY;
       case EXECUTING:
-        return TaskGroupState.State.EXECUTING;
+        return TaskState.State.EXECUTING;
       case COMPLETE:
         return COMPLETE;
       case FAILED_RECOVERABLE:
-        return TaskGroupState.State.FAILED_RECOVERABLE;
+        return TaskState.State.FAILED_RECOVERABLE;
       case FAILED_UNRECOVERABLE:
-        return TaskGroupState.State.FAILED_UNRECOVERABLE;
+        return TaskState.State.FAILED_UNRECOVERABLE;
       case ON_HOLD:
         return ON_HOLD;
       default:
-        throw new UnknownExecutionStateException(new Exception("This TaskGroupState is unknown: " + state));
+        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
     }
   }
 
-  private TaskGroupState.RecoverableFailureCause convertFailureCause(
+  private TaskState.RecoverableFailureCause convertFailureCause(
       final ControlMessage.RecoverableFailureCause cause) {
     switch (cause) {
       case InputReadFailure:
-        return TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE;
+        return TaskState.RecoverableFailureCause.INPUT_READ_FAILURE;
       case OutputWriteFailure:
-        return TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE;
+        return TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE;
       default:
         throw new UnknownFailureCauseException(
             new Throwable("The failure cause for the recoverable failure is unknown"));
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index c76e64a..d9641f4 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageSender;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.driver.context.ActiveContext;
 
@@ -36,7 +36,7 @@ import java.util.concurrent.ExecutorService;
  * Such information may include:
  *    a) The executor's resource type.
  *    b) The executor's capacity (ex. number of cores).
- *    c) Task groups scheduled/launched for the executor.
+ *    c) Tasks scheduled/launched for the executor.
  *    d) Name of the physical node which hosts this executor.
  *    e) (Please add other information as we implement more features).
  */
@@ -45,9 +45,9 @@ public final class ExecutorRepresenter {
 
   private final String executorId;
   private final ResourceSpecification resourceSpecification;
-  private final Set<String> runningTaskGroups;
-  private final Set<String> completeTaskGroups;
-  private final Set<String> failedTaskGroups;
+  private final Set<String> runningTasks;
+  private final Set<String> completeTasks;
+  private final Set<String> failedTasks;
   private final MessageSender<ControlMessage.Message> messageSender;
   private final ActiveContext activeContext;
   private final ExecutorService serializationExecutorService;
@@ -71,42 +71,42 @@ public final class ExecutorRepresenter {
     this.executorId = executorId;
     this.resourceSpecification = resourceSpecification;
     this.messageSender = messageSender;
-    this.runningTaskGroups = new HashSet<>();
-    this.completeTaskGroups = new HashSet<>();
-    this.failedTaskGroups = new HashSet<>();
+    this.runningTasks = new HashSet<>();
+    this.completeTasks = new HashSet<>();
+    this.failedTasks = new HashSet<>();
     this.activeContext = activeContext;
     this.serializationExecutorService = serializationExecutorService;
     this.nodeName = nodeName;
   }
 
   /**
-   * Marks all TaskGroups which were running in this executor as failed.
+   * Marks all Tasks which were running in this executor as failed.
    */
   public void onExecutorFailed() {
-    runningTaskGroups.forEach(taskGroupId -> failedTaskGroups.add(taskGroupId));
-    runningTaskGroups.clear();
+    runningTasks.forEach(taskId -> failedTasks.add(taskId));
+    runningTasks.clear();
   }
 
   /**
-   * Marks the TaskGroup as running, and sends scheduling message to the executor.
-   * @param scheduledTaskGroup
+   * Marks the Task as running, and sends scheduling message to the executor.
+   * @param scheduledTask
    */
-  public void onTaskGroupScheduled(final ScheduledTaskGroup scheduledTaskGroup) {
-    runningTaskGroups.add(scheduledTaskGroup.getTaskGroupId());
-    failedTaskGroups.remove(scheduledTaskGroup.getTaskGroupId());
+  public void onTaskScheduled(final ScheduledTask scheduledTask) {
+    runningTasks.add(scheduledTask.getTaskId());
+    failedTasks.remove(scheduledTask.getTaskId());
 
     serializationExecutorService.submit(new Runnable() {
       @Override
       public void run() {
-        final byte[] serialized = SerializationUtils.serialize(scheduledTaskGroup);
+        final byte[] serialized = SerializationUtils.serialize(scheduledTask);
         sendControlMessage(
             ControlMessage.Message.newBuilder()
                 .setId(RuntimeIdGenerator.generateMessageId())
                 .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
-                .setType(ControlMessage.MessageType.ScheduleTaskGroup)
-                .setScheduleTaskGroupMsg(
-                    ControlMessage.ScheduleTaskGroupMsg.newBuilder()
-                        .setTaskGroup(ByteString.copyFrom(serialized))
+                .setType(ControlMessage.MessageType.ScheduleTask)
+                .setScheduleTaskMsg(
+                    ControlMessage.ScheduleTaskMsg.newBuilder()
+                        .setTask(ByteString.copyFrom(serialized))
                         .build())
                 .build());
       }
@@ -122,49 +122,49 @@ public final class ExecutorRepresenter {
   }
 
   /**
-   * Marks the specified TaskGroup as completed.
-   * @param taskGroupId id of the TaskGroup
+   * Marks the specified Task as completed.
+   * @param taskId id of the Task
    */
-  public void onTaskGroupExecutionComplete(final String taskGroupId) {
-    runningTaskGroups.remove(taskGroupId);
-    completeTaskGroups.add(taskGroupId);
+  public void onTaskExecutionComplete(final String taskId) {
+    runningTasks.remove(taskId);
+    completeTasks.add(taskId);
   }
 
   /**
-   * Marks the specified TaskGroup as failed.
-   * @param taskGroupId id of the TaskGroup
+   * Marks the specified Task as failed.
+   * @param taskId id of the Task
    */
-  public void onTaskGroupExecutionFailed(final String taskGroupId) {
-    runningTaskGroups.remove(taskGroupId);
-    failedTaskGroups.add(taskGroupId);
+  public void onTaskExecutionFailed(final String taskId) {
+    runningTasks.remove(taskId);
+    failedTasks.add(taskId);
   }
 
   /**
-   * @return how many TaskGroups can this executor simultaneously run
+   * @return how many Tasks can this executor simultaneously run
    */
   public int getExecutorCapacity() {
     return resourceSpecification.getCapacity();
   }
 
   /**
-   * @return set of ids of TaskGroups that are running in this executor
+   * @return set of ids of Tasks that are running in this executor
    */
-  public Set<String> getRunningTaskGroups() {
-    return runningTaskGroups;
+  public Set<String> getRunningTasks() {
+    return runningTasks;
   }
 
   /**
-   * @return set of ids of TaskGroups that have been failed in this exeuctor
+   * @return set of ids of Tasks that have been failed in this exeuctor
    */
-  public Set<String> getFailedTaskGroups() {
-    return failedTaskGroups;
+  public Set<String> getFailedTasks() {
+    return failedTasks;
   }
 
   /**
-   * @return set of ids of TaskGroups that have been completed in this executor
+   * @return set of ids of Tasks that have been completed in this executor
    */
-  public Set<String> getCompleteTaskGroups() {
-    return completeTaskGroups;
+  public Set<String> getCompleteTasks() {
+    return completeTasks;
   }
 
   /**
@@ -199,8 +199,8 @@ public final class ExecutorRepresenter {
   public String toString() {
     final StringBuffer sb = new StringBuffer("ExecutorRepresenter{");
     sb.append("executorId='").append(executorId).append('\'');
-    sb.append(", runningTaskGroups=").append(runningTaskGroups);
-    sb.append(", failedTaskGroups=").append(failedTaskGroups);
+    sb.append(", runningTasks=").append(runningTasks);
+    sb.append(", failedTasks=").append(failedTasks);
     sb.append('}');
     return sb.toString();
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
index 577d286..6a2b2b8 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ResourceSpecification.java
@@ -44,7 +44,7 @@ public final class ResourceSpecification {
   }
 
   /**
-   * @return The number of TaskGroups that can be run in this container.
+   * @return The number of Tasks that can be run in this container.
    */
   public int getCapacity() {
     return capacity;
@@ -89,7 +89,7 @@ public final class ResourceSpecification {
     }
 
     /**
-     * @param inputCapacity the number of TaskGroups that can be run in this container
+     * @param inputCapacity the number of Tasks that can be run in this container
      * @return {@link Builder} object.
      */
     public Builder setCapacity(final int inputCapacity) {
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
index aa27c0e..bdc265d 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java
@@ -22,12 +22,12 @@ import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
+import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.common.exception.*;
 import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.StageState;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -42,13 +42,13 @@ import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 
-import static edu.snu.nemo.runtime.common.state.TaskGroupState.State.ON_HOLD;
+import static edu.snu.nemo.runtime.common.state.TaskState.State.ON_HOLD;
 
 /**
  * (WARNING) Only a single dedicated thread should use the public methods of this class.
  * (i.e., runtimeMasterThread in RuntimeMaster)
  *
- * BatchSingleJobScheduler receives a single {@link PhysicalPlan} to execute and schedules the TaskGroups.
+ * BatchSingleJobScheduler receives a single {@link PhysicalPlan} to execute and schedules the Tasks.
  * The policy by which it schedules them is dependent on the implementation of {@link SchedulingPolicy}.
  */
 @DriverSide
@@ -62,7 +62,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    */
   private final SchedulingPolicy schedulingPolicy;
   private final SchedulerRunner schedulerRunner;
-  private final PendingTaskGroupCollection pendingTaskGroupCollection;
+  private final PendingTaskCollection pendingTaskCollection;
   private final ExecutorRegistry executorRegistry;
 
   /**
@@ -81,14 +81,14 @@ public final class BatchSingleJobScheduler implements Scheduler {
   @Inject
   public BatchSingleJobScheduler(final SchedulingPolicy schedulingPolicy,
                                  final SchedulerRunner schedulerRunner,
-                                 final PendingTaskGroupCollection pendingTaskGroupCollection,
+                                 final PendingTaskCollection pendingTaskCollection,
                                  final BlockManagerMaster blockManagerMaster,
                                  final PubSubEventHandlerWrapper pubSubEventHandlerWrapper,
                                  final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler,
                                  final ExecutorRegistry executorRegistry) {
     this.schedulingPolicy = schedulingPolicy;
     this.schedulerRunner = schedulerRunner;
-    this.pendingTaskGroupCollection = pendingTaskGroupCollection;
+    this.pendingTaskCollection = pendingTaskCollection;
     this.blockManagerMaster = blockManagerMaster;
     this.pubSubEventHandlerWrapper = pubSubEventHandlerWrapper;
     updatePhysicalPlanEventHandler.setScheduler(this);
@@ -110,7 +110,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
     this.jobStateManager = scheduledJobStateManager;
 
     schedulerRunner.scheduleJob(scheduledJobStateManager);
-    pendingTaskGroupCollection.onJobScheduled(physicalPlan);
+    pendingTaskCollection.onJobScheduled(physicalPlan);
 
     LOG.info("Job to schedule: {}", jobToSchedule.getId());
 
@@ -127,43 +127,44 @@ public final class BatchSingleJobScheduler implements Scheduler {
     // NOTE: what's already been executed is not modified in the new physical plan.
     this.physicalPlan = newPhysicalPlan;
     if (taskInfo != null) {
-      onTaskGroupExecutionComplete(taskInfo.left(), taskInfo.right(), true);
+      onTaskExecutionComplete(taskInfo.left(), taskInfo.right(), true);
     }
   }
 
   /**
-   * Receives a {@link edu.snu.nemo.runtime.common.comm.ControlMessage.TaskGroupStateChangedMsg} from an executor.
+   * Receives a {@link edu.snu.nemo.runtime.common.comm.ControlMessage.TaskStateChangedMsg} from an executor.
    * The message is received via communicator where this method is called.
    * @param executorId the id of the executor where the message was sent from.
-   * @param taskGroupId whose state has changed
+   * @param taskId whose state has changed
    * @param newState the state to change to
    * @param taskPutOnHold the ID of task that are put on hold. It is null otherwise.
    */
   @Override
-  public void onTaskGroupStateChanged(final String executorId, final String taskGroupId,
-                                      final TaskGroupState.State newState, final int attemptIdx,
-                                      @Nullable final String taskPutOnHold,
-                                      final TaskGroupState.RecoverableFailureCause failureCause) {
+  public void onTaskStateChanged(final String executorId, final String taskId,
+                                 final TaskState.State newState, final int attemptIdx,
+                                 @Nullable final String taskPutOnHold,
+                                 final TaskState.RecoverableFailureCause failureCause) {
     switch (newState) {
-    case COMPLETE:
-      jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
-      onTaskGroupExecutionComplete(executorId, taskGroupId);
-      break;
-    case FAILED_RECOVERABLE:
-      onTaskGroupExecutionFailedRecoverable(executorId, taskGroupId, attemptIdx, newState, failureCause);
-      break;
-    case ON_HOLD:
-      jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
-      onTaskGroupExecutionOnHold(executorId, taskGroupId, taskPutOnHold);
-      break;
-    case FAILED_UNRECOVERABLE:
-      throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The job failed on TaskGroup #")
-          .append(taskGroupId).append(" in Executor ").append(executorId).toString()));
-    case READY:
-    case EXECUTING:
-      throw new IllegalStateTransitionException(new Exception("The states READY/EXECUTING cannot occur at this point"));
-    default:
-      throw new UnknownExecutionStateException(new Exception("This TaskGroupState is unknown: " + newState));
+      case COMPLETE:
+        jobStateManager.onTaskStateChanged(taskId, newState);
+        onTaskExecutionComplete(executorId, taskId);
+        break;
+      case FAILED_RECOVERABLE:
+        onTaskExecutionFailedRecoverable(executorId, taskId, attemptIdx, newState, failureCause);
+        break;
+      case ON_HOLD:
+        jobStateManager.onTaskStateChanged(taskId, newState);
+        onTaskExecutionOnHold(executorId, taskId, taskPutOnHold);
+        break;
+      case FAILED_UNRECOVERABLE:
+        throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The job failed on Task #")
+            .append(taskId).append(" in Executor ").append(executorId).toString()));
+      case READY:
+      case EXECUTING:
+        throw new IllegalStateTransitionException(
+            new Exception("The states READY/EXECUTING cannot occur at this point"));
+      default:
+        throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + newState));
     }
   }
 
@@ -175,34 +176,34 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   @Override
   public void onExecutorRemoved(final String executorId) {
-    final Set<String> taskGroupsToReExecute = new HashSet<>();
+    final Set<String> tasksToReExecute = new HashSet<>();
 
-    // TaskGroups for lost blocks
-    taskGroupsToReExecute.addAll(blockManagerMaster.removeWorker(executorId));
+    // Tasks for lost blocks
+    tasksToReExecute.addAll(blockManagerMaster.removeWorker(executorId));
 
-    // TaskGroups executing on the removed executor
+    // Tasks executing on the removed executor
     executorRegistry.setRepresenterAsFailed(executorId);
     final ExecutorRepresenter executor = executorRegistry.getFailedExecutorRepresenter(executorId);
     executor.onExecutorFailed();
-    taskGroupsToReExecute.addAll(executor.getFailedTaskGroups());
+    tasksToReExecute.addAll(executor.getFailedTasks());
 
-    taskGroupsToReExecute.forEach(failedTaskGroupId ->
-      onTaskGroupStateChanged(executorId, failedTaskGroupId, TaskGroupState.State.FAILED_RECOVERABLE,
-          SCHEDULE_ATTEMPT_ON_CONTAINER_FAILURE, null,
-          TaskGroupState.RecoverableFailureCause.CONTAINER_FAILURE));
+    tasksToReExecute.forEach(failedTaskId ->
+        onTaskStateChanged(executorId, failedTaskId, TaskState.State.FAILED_RECOVERABLE,
+            SCHEDULE_ATTEMPT_ON_CONTAINER_FAILURE, null,
+            TaskState.RecoverableFailureCause.CONTAINER_FAILURE));
 
-    if (!taskGroupsToReExecute.isEmpty()) {
-      // Schedule a stage after marking the necessary task groups to failed_recoverable.
-      // The stage for one of the task groups that failed is a starting point to look
+    if (!tasksToReExecute.isEmpty()) {
+      // Schedule a stage after marking the necessary tasks to failed_recoverable.
+      // The stage for one of the tasks that failed is a starting point to look
       // for the next stage to be scheduled.
-      scheduleNextStage(RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupsToReExecute.iterator().next()));
+      scheduleNextStage(RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next()));
     }
   }
 
   @Override
   public void terminate() {
     this.schedulerRunner.terminate();
-    this.pendingTaskGroupCollection.close();
+    this.pendingTaskCollection.close();
   }
 
   /**
@@ -236,7 +237,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
   }
 
   /**
-   * Selects the list of stages to schedule, in the order they must be added to {@link PendingTaskGroupCollection}.
+   * Selects the list of stages to schedule, in the order they must be added to {@link PendingTaskCollection}.
    *
    * This is a recursive function that decides which schedule group to schedule upon a stage completion, or a failure.
    * It takes the currentScheduleGroupIndex as a reference point to begin looking for the stages to execute:
@@ -252,7 +253,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
    * @param currentScheduleGroupIndex
    *      the index of the schedule group that is executing/has executed when this method is called.
    * @return an optional of the (possibly empty) list of next schedulable stages, in the order they should be
-   * enqueued to {@link PendingTaskGroupCollection}.
+   * enqueued to {@link PendingTaskCollection}.
    */
   private Optional<List<PhysicalStage>> selectNextStagesToSchedule(final int currentScheduleGroupIndex) {
     if (currentScheduleGroupIndex > initialScheduleGroup) {
@@ -321,7 +322,7 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
   /**
    * Schedules the given stage.
-   * It adds the list of task groups for the stage where the scheduler thread continuously polls from.
+   * It adds the list of tasks for the stage where the scheduler thread continuously polls from.
    * @param stageToSchedule the stage to schedule.
    */
   private void scheduleStage(final PhysicalStage stageToSchedule) {
@@ -332,38 +333,38 @@ public final class BatchSingleJobScheduler implements Scheduler {
 
     final Enum stageState = jobStateManager.getStageState(stageToSchedule.getId()).getStateMachine().getCurrentState();
 
-    final List<String> taskGroupIdsToSchedule = new LinkedList<>();
-    for (final String taskGroupId : stageToSchedule.getTaskGroupIds()) {
-      // this happens when the belonging stage's other task groups have failed recoverable,
-      // but this task group's results are safe.
-      final TaskGroupState.State taskGroupState =
-          (TaskGroupState.State)
-              jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState();
+    final List<String> taskIdsToSchedule = new LinkedList<>();
+    for (final String taskId : stageToSchedule.getTaskIds()) {
+      // this happens when the belonging stage's other tasks have failed recoverable,
+      // but this task's results are safe.
+      final TaskState.State taskState =
+          (TaskState.State)
+              jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState();
 
-      switch (taskGroupState) {
+      switch (taskState) {
         case COMPLETE:
         case EXECUTING:
-          LOG.info("Skipping {} because its outputs are safe!", taskGroupId);
+          LOG.info("Skipping {} because its outputs are safe!", taskId);
           break;
         case READY:
           if (stageState == StageState.State.FAILED_RECOVERABLE) {
             LOG.info("Skipping {} because it is already in the queue, but just hasn't been scheduled yet!",
-                taskGroupId);
+                taskId);
           } else {
-            LOG.info("Scheduling {}", taskGroupId);
-            taskGroupIdsToSchedule.add(taskGroupId);
+            LOG.info("Scheduling {}", taskId);
+            taskIdsToSchedule.add(taskId);
           }
           break;
         case FAILED_RECOVERABLE:
-          LOG.info("Re-scheduling {} for failure recovery", taskGroupId);
-          jobStateManager.onTaskGroupStateChanged(taskGroupId, TaskGroupState.State.READY);
-          taskGroupIdsToSchedule.add(taskGroupId);
+          LOG.info("Re-scheduling {} for failure recovery", taskId);
+          jobStateManager.onTaskStateChanged(taskId, TaskState.State.READY);
+          taskIdsToSchedule.add(taskId);
           break;
         case ON_HOLD:
           // Do nothing
           break;
         default:
-          throw new SchedulingException(new Throwable("Detected a FAILED_UNRECOVERABLE TaskGroup"));
+          throw new SchedulingException(new Throwable("Detected a FAILED_UNRECOVERABLE Task"));
       }
     }
     if (stageState == StageState.State.FAILED_RECOVERABLE) {
@@ -379,30 +380,30 @@ public final class BatchSingleJobScheduler implements Scheduler {
     // each readable and source task will be bounded in executor.
     final List<Map<String, Readable>> logicalTaskIdToReadables = stageToSchedule.getLogicalTaskIdToReadables();
 
-    taskGroupIdsToSchedule.forEach(taskGroupId -> {
-      blockManagerMaster.onProducerTaskGroupScheduled(taskGroupId);
-      final int taskGroupIdx = RuntimeIdGenerator.getIndexFromTaskGroupId(taskGroupId);
-      LOG.debug("Enquing {}", taskGroupId);
-      pendingTaskGroupCollection.add(new ScheduledTaskGroup(physicalPlan.getId(),
-          stageToSchedule.getSerializedTaskGroupDag(), taskGroupId, stageIncomingEdges, stageOutgoingEdges, attemptIdx,
-          stageToSchedule.getContainerType(), logicalTaskIdToReadables.get(taskGroupIdx)));
+    taskIdsToSchedule.forEach(taskId -> {
+      blockManagerMaster.onProducerTaskScheduled(taskId);
+      final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(taskId);
+      LOG.debug("Enquing {}", taskId);
+      pendingTaskCollection.add(new ScheduledTask(physicalPlan.getId(),
+          stageToSchedule.getSerializedTaskDag(), taskId, stageIncomingEdges, stageOutgoingEdges, attemptIdx,
+          stageToSchedule.getContainerType(), logicalTaskIdToReadables.get(taskIdx)));
     });
-    schedulerRunner.onATaskGroupAvailable();
+    schedulerRunner.onATaskAvailable();
   }
 
   /**
-   * Gets the DAG of a task group from it's ID.
+   * Gets the DAG of a task from it's ID.
    *
-   * @param taskGroupId the ID of the task group to get.
-   * @return the DAG of the task group.
+   * @param taskId the ID of the task to get.
+   * @return the DAG of the task.
    */
-  private DAG<Task, RuntimeEdge<Task>> getTaskGroupDagById(final String taskGroupId) {
+  private DAG<Task, RuntimeEdge<Task>> getTaskDagById(final String taskId) {
     for (final PhysicalStage physicalStage : physicalPlan.getStageDAG().getVertices()) {
-      if (physicalStage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId))) {
-        return physicalStage.getTaskGroupDag();
+      if (physicalStage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
+        return physicalStage.getTaskDag();
       }
     }
-    throw new RuntimeException(new Throwable("This taskGroupId does not exist in the plan"));
+    throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
   }
 
   private PhysicalStage getStageById(final String stageId) {
@@ -411,64 +412,64 @@ public final class BatchSingleJobScheduler implements Scheduler {
         return physicalStage;
       }
     }
-    throw new RuntimeException(new Throwable("This taskGroupId does not exist in the plan"));
+    throw new RuntimeException(new Throwable("This taskId does not exist in the plan"));
   }
 
   /**
-   * Action after task group execution has been completed, not after it has been put on hold.
+   * Action after task execution has been completed, not after it has been put on hold.
    *
    * @param executorId  the ID of the executor.
-   * @param taskGroupId the ID pf the task group completed.
+   * @param taskId the ID pf the task completed.
    */
-  private void onTaskGroupExecutionComplete(final String executorId,
-                                            final String taskGroupId) {
-    onTaskGroupExecutionComplete(executorId, taskGroupId, false);
+  private void onTaskExecutionComplete(final String executorId,
+                                       final String taskId) {
+    onTaskExecutionComplete(executorId, taskId, false);
   }
 
   /**
-   * Action after task group execution has been completed.
+   * Action after task execution has been completed.
    * @param executorId id of the executor.
-   * @param taskGroupId the ID of the task group completed.
+   * @param taskId the ID of the task completed.
    * @param isOnHoldToComplete whether or not if it is switched to complete after it has been on hold.
    */
-  private void onTaskGroupExecutionComplete(final String executorId,
-                                            final String taskGroupId,
-                                            final Boolean isOnHoldToComplete) {
-    LOG.debug("{} completed in {}", new Object[]{taskGroupId, executorId});
+  private void onTaskExecutionComplete(final String executorId,
+                                       final String taskId,
+                                       final Boolean isOnHoldToComplete) {
+    LOG.debug("{} completed in {}", new Object[]{taskId, executorId});
     if (!isOnHoldToComplete) {
-      executorRegistry.getRunningExecutorRepresenter(executorId).onTaskGroupExecutionComplete(taskGroupId);
+      executorRegistry.getRunningExecutorRepresenter(executorId).onTaskExecutionComplete(taskId);
     }
 
-    final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
-    if (jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion)) {
-      // if the stage this task group belongs to is complete,
+    final String stageIdForTaskUponCompletion = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
+    if (jobStateManager.checkStageCompletion(stageIdForTaskUponCompletion)) {
+      // if the stage this task belongs to is complete,
       if (!jobStateManager.checkJobTermination()) { // and if the job is not yet complete or failed,
-        scheduleNextStage(stageIdForTaskGroupUponCompletion);
+        scheduleNextStage(stageIdForTaskUponCompletion);
       }
     }
     schedulerRunner.onAnExecutorAvailable();
   }
 
   /**
-   * Action for after task group execution is put on hold.
+   * Action for after task execution is put on hold.
    * @param executorId     the ID of the executor.
-   * @param taskGroupId    the ID of the task group.
+   * @param taskId    the ID of the task.
    * @param taskPutOnHold  the ID of task that is put on hold.
    */
-  private void onTaskGroupExecutionOnHold(final String executorId,
-                                          final String taskGroupId,
-                                          final String taskPutOnHold) {
-    LOG.info("{} put on hold in {}", new Object[]{taskGroupId, executorId});
-    executorRegistry.getRunningExecutorRepresenter(executorId).onTaskGroupExecutionComplete(taskGroupId);
-    final String stageIdForTaskGroupUponCompletion = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+  private void onTaskExecutionOnHold(final String executorId,
+                                     final String taskId,
+                                     final String taskPutOnHold) {
+    LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
+    executorRegistry.getRunningExecutorRepresenter(executorId).onTaskExecutionComplete(taskId);
+    final String stageIdForTaskUponCompletion = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
 
     final boolean stageComplete =
-        jobStateManager.checkStageCompletion(stageIdForTaskGroupUponCompletion);
+        jobStateManager.checkStageCompletion(stageIdForTaskUponCompletion);
 
     if (stageComplete) {
       // get optimization vertex from the task.
       final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-          getTaskGroupDagById(taskGroupId).getVertices().stream() // get tasks list
+          getTaskDagById(taskId).getVertices().stream() // get tasks list
               .filter(task -> task.getId().equals(taskPutOnHold)) // find it
               .map(physicalPlan::getIRVertexOf) // get the corresponding IRVertex, the MetricCollectionBarrierVertex
               .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
@@ -480,74 +481,74 @@ public final class BatchSingleJobScheduler implements Scheduler {
       // and we will use this vertex to perform metric collection and dynamic optimization.
 
       pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
-          new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, Pair.of(executorId, taskGroupId)));
+          new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, Pair.of(executorId, taskId)));
     } else {
-      onTaskGroupExecutionComplete(executorId, taskGroupId, true);
+      onTaskExecutionComplete(executorId, taskId, true);
     }
     schedulerRunner.onAnExecutorAvailable();
   }
 
   /**
-   * Action for after task group execution has failed but it's recoverable.
+   * Action for after task execution has failed but it's recoverable.
    * @param executorId    the ID of the executor
-   * @param taskGroupId   the ID of the task group
+   * @param taskId   the ID of the task
    * @param attemptIdx    the attempt index
    * @param newState      the state this situation
    * @param failureCause  the cause of failure
    */
-  private void onTaskGroupExecutionFailedRecoverable(final String executorId, final String taskGroupId,
-                                                     final int attemptIdx, final TaskGroupState.State newState,
-                                                     final TaskGroupState.RecoverableFailureCause failureCause) {
-    LOG.info("{} failed in {} by {}", new Object[]{taskGroupId, executorId, failureCause});
-    executorRegistry.getExecutorRepresenter(executorId).onTaskGroupExecutionFailed(taskGroupId);
+  private void onTaskExecutionFailedRecoverable(final String executorId, final String taskId,
+                                                final int attemptIdx, final TaskState.State newState,
+                                                final TaskState.RecoverableFailureCause failureCause) {
+    LOG.info("{} failed in {} by {}", new Object[]{taskId, executorId, failureCause});
+    executorRegistry.getExecutorRepresenter(executorId).onTaskExecutionFailed(taskId);
 
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId);
+    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(taskId);
     final int attemptIndexForStage =
-        jobStateManager.getAttemptCountForStage(RuntimeIdGenerator.getStageIdFromTaskGroupId(taskGroupId));
+        jobStateManager.getAttemptCountForStage(RuntimeIdGenerator.getStageIdFromTaskId(taskId));
 
     switch (failureCause) {
-      // Previous task group must be re-executed, and incomplete task groups of the belonging stage must be rescheduled.
+      // Previous task must be re-executed, and incomplete tasks of the belonging stage must be rescheduled.
       case INPUT_READ_FAILURE:
         if (attemptIdx == attemptIndexForStage) {
-          jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
-          LOG.info("All task groups of {} will be made failed_recoverable.", stageId);
+          jobStateManager.onTaskStateChanged(taskId, newState);
+          LOG.info("All tasks of {} will be made failed_recoverable.", stageId);
           for (final PhysicalStage stage : physicalPlan.getStageDAG().getTopologicalSort()) {
             if (stage.getId().equals(stageId)) {
-              LOG.info("Removing TaskGroups for {} before they are scheduled to an executor", stage.getId());
-              pendingTaskGroupCollection.removeTaskGroupsAndDescendants(stage.getId());
-              stage.getTaskGroupIds().forEach(dstTaskGroupId -> {
-                if (jobStateManager.getTaskGroupState(dstTaskGroupId).getStateMachine().getCurrentState()
-                    != TaskGroupState.State.COMPLETE) {
-                  jobStateManager.onTaskGroupStateChanged(dstTaskGroupId, TaskGroupState.State.FAILED_RECOVERABLE);
-                  blockManagerMaster.onProducerTaskGroupFailed(dstTaskGroupId);
+              LOG.info("Removing Tasks for {} before they are scheduled to an executor", stage.getId());
+              pendingTaskCollection.removeTasksAndDescendants(stage.getId());
+              stage.getTaskIds().forEach(dstTaskId -> {
+                if (jobStateManager.getTaskState(dstTaskId).getStateMachine().getCurrentState()
+                    != TaskState.State.COMPLETE) {
+                  jobStateManager.onTaskStateChanged(dstTaskId, TaskState.State.FAILED_RECOVERABLE);
+                  blockManagerMaster.onProducerTaskFailed(dstTaskId);
                 }
               });
               break;
             }
           }
-          // the stage this task group belongs to has become failed recoverable.
+          // the stage this task belongs to has become failed recoverable.
           // it is a good point to start searching for another stage to schedule.
           scheduleNextStage(stageId);
         } else if (attemptIdx < attemptIndexForStage) {
           // if attemptIdx < attemptIndexForStage, we can ignore this late arriving message.
-          LOG.info("{} state change to failed_recoverable arrived late, we will ignore this.", taskGroupId);
+          LOG.info("{} state change to failed_recoverable arrived late, we will ignore this.", taskId);
         } else {
-          throw new SchedulingException(new Throwable("AttemptIdx for a task group cannot be greater than its stage"));
+          throw new SchedulingException(new Throwable("AttemptIdx for a task cannot be greater than its stage"));
         }
         break;
-      // The task group executed successfully but there is something wrong with the output store.
+      // The task executed successfully but there is something wrong with the output store.
       case OUTPUT_WRITE_FAILURE:
-        jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
-        LOG.info("Only the failed task group will be retried.");
+        jobStateManager.onTaskStateChanged(taskId, newState);
+        LOG.info("Only the failed task will be retried.");
 
-        // the stage this task group belongs to has become failed recoverable.
+        // the stage this task belongs to has become failed recoverable.
         // it is a good point to start searching for another stage to schedule.
-        blockManagerMaster.onProducerTaskGroupFailed(taskGroupId);
+        blockManagerMaster.onProducerTaskFailed(taskId);
         scheduleNextStage(stageId);
         break;
       case CONTAINER_FAILURE:
-        jobStateManager.onTaskGroupStateChanged(taskGroupId, newState);
-        LOG.info("Only the failed task group will be retried.");
+        jobStateManager.onTaskStateChanged(taskId, newState);
+        LOG.info("Only the failed task will be retried.");
         break;
       default:
         throw new UnknownFailureCauseException(new Throwable("Unknown cause: " + failureCause));
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
index ac02183..1c38e35 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -46,10 +46,10 @@ public final class CompositeSchedulingPolicy implements SchedulingPolicy {
 
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+                                                             final ScheduledTask scheduledTask) {
     Set<ExecutorRepresenter> candidates = executorRepresenterSet;
     for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) {
-      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, scheduledTaskGroup);
+      candidates = schedulingPolicy.filterExecutorRepresenters(candidates, scheduledTask);
     }
     return candidates;
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
index b88c0b6..d64fffb 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -36,21 +36,21 @@ public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolic
 
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type.
-   *                               If the container type of target TaskGroup is NONE, it will return the original set.
-   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   *                               If the container type of target Task is NONE, it will return the original set.
+   * @param scheduledTask {@link ScheduledTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+                                                             final ScheduledTask scheduledTask) {
 
-    if (scheduledTaskGroup.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
+    if (scheduledTask.getContainerType().equals(ExecutorPlacementProperty.NONE)) {
       return executorRepresenterSet;
     }
 
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
-            .filter(executor -> executor.getContainerType().equals(scheduledTaskGroup.getContainerType()))
+            .filter(executor -> executor.getContainerType().equals(scheduledTask.getContainerType()))
             .collect(Collectors.toSet());
 
     return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
index 1eb2958..2f598d2 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
 import javax.inject.Inject;
@@ -24,7 +24,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * This policy finds executor that has free slot for a TaskGroup.
+ * This policy finds executor that has free slot for a Task.
  */
 public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
   @VisibleForTesting
@@ -35,15 +35,15 @@ public final class FreeSlotSchedulingPolicy implements SchedulingPolicy {
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors.
    *                               Executors that do not have any free slots will be filtered by this policy.
-   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @param scheduledTask {@link ScheduledTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+                                                             final ScheduledTask scheduledTask) {
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
-            .filter(executor -> executor.getRunningTaskGroups().size() < executor.getExecutorCapacity())
+            .filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity())
             .collect(Collectors.toSet());
 
     return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
similarity index 53%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupCollection.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
index db3a084..0fcbe74 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskGroupCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/PendingTaskCollection.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -27,36 +27,36 @@ import java.util.NoSuchElementException;
 import java.util.Optional;
 
 /**
- * Keep tracks of all pending task groups.
- * {@link Scheduler} enqueues the TaskGroups to schedule to this queue.
- * {@link SchedulerRunner} refers to this queue when scheduling TaskGroups.
+ * Keep tracks of all pending tasks.
+ * {@link Scheduler} enqueues the Tasks to schedule to this queue.
+ * {@link SchedulerRunner} refers to this queue when scheduling Tasks.
  */
 @ThreadSafe
 @DriverSide
-@DefaultImplementation(SingleJobTaskGroupCollection.class)
-public interface PendingTaskGroupCollection {
+@DefaultImplementation(SingleJobTaskCollection.class)
+public interface PendingTaskCollection {
 
   /**
-   * Adds a TaskGroup to this collection.
-   * @param scheduledTaskGroup to add.
+   * Adds a Task to this collection.
+   * @param scheduledTask to add.
    */
-  void add(final ScheduledTaskGroup scheduledTaskGroup);
+  void add(final ScheduledTask scheduledTask);
 
   /**
-   * Removes the specified TaskGroup to be scheduled.
-   * @param taskGroupId id of the TaskGroup
-   * @return the specified TaskGroup
-   * @throws NoSuchElementException if the specified TaskGroup is not in the queue,
-   *                                or removing this TaskGroup breaks scheduling order
+   * Removes the specified Task to be scheduled.
+   * @param taskId id of the Task
+   * @return the specified Task
+   * @throws NoSuchElementException if the specified Task is not in the queue,
+   *                                or removing this Task breaks scheduling order
    */
-  ScheduledTaskGroup remove(final String taskGroupId) throws NoSuchElementException;
+  ScheduledTask remove(final String taskId) throws NoSuchElementException;
 
   /**
-   * Peeks TaskGroups that can be scheduled according to job dependency priority.
+   * Peeks Tasks that can be scheduled according to job dependency priority.
    * Changes to the queue must not reflected to the returned collection to avoid concurrent modification.
-   * @return TaskGroups that can be scheduled, or {@link Optional#empty()} if the queue is empty
+   * @return Tasks that can be scheduled, or {@link Optional#empty()} if the queue is empty
    */
-  Optional<Collection<ScheduledTaskGroup>> peekSchedulableTaskGroups();
+  Optional<Collection<ScheduledTask>> peekSchedulableTasks();
 
   /**
    * Registers a job to this queue in case the queue needs to understand the topology of the job DAG.
@@ -67,14 +67,14 @@ public interface PendingTaskGroupCollection {
   /**
    * Removes a stage and its descendant stages from this queue.
    * This is to be used for fault tolerance purposes,
-   * say when a stage fails and all affected TaskGroups must be removed.
-   * @param stageIdOfTaskGroups for the stage to begin the removal recursively.
+   * say when a stage fails and all affected Tasks must be removed.
+   * @param stageIdOfTasks for the stage to begin the removal recursively.
    */
-  void removeTaskGroupsAndDescendants(final String stageIdOfTaskGroups);
+  void removeTasksAndDescendants(final String stageIdOfTasks);
 
   /**
-   * Checks whether there are schedulable TaskGroups in the queue or not.
-   * @return true if there are schedulable TaskGroups in the queue, false otherwise.
+   * Checks whether there are schedulable Tasks in the queue or not.
+   * @return true if there are schedulable Tasks in the queue, false otherwise.
    */
   boolean isEmpty();
 
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
index ae19b66..61077fa 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/RoundRobinSchedulingPolicy.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -33,7 +33,7 @@ import java.util.stream.Collectors;
  * A Round-Robin implementation used by {@link BatchSingleJobScheduler}.
  *
  * This policy keeps a list of available {@link ExecutorRepresenter} for each type of container.
- * The RR policy is used for each container type when trying to schedule a task group.
+ * The RR policy is used for each container type when trying to schedule a task.
  */
 @ThreadSafe
 @DriverSide
@@ -47,15 +47,15 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
 
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by round robin behaviour.
-   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @param scheduledTask {@link ScheduledTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+                                                             final ScheduledTask scheduledTask) {
     final OptionalInt minOccupancy =
         executorRepresenterSet.stream()
-        .map(executor -> executor.getRunningTaskGroups().size())
+        .map(executor -> executor.getRunningTasks().size())
         .mapToInt(i -> i).min();
 
     if (!minOccupancy.isPresent()) {
@@ -64,7 +64,7 @@ public final class RoundRobinSchedulingPolicy implements SchedulingPolicy {
 
     final Set<ExecutorRepresenter> candidateExecutors =
         executorRepresenterSet.stream()
-        .filter(executor -> executor.getRunningTaskGroups().size() == minOccupancy.getAsInt())
+        .filter(executor -> executor.getRunningTasks().size() == minOccupancy.getAsInt())
         .collect(Collectors.toSet());
 
     return candidateExecutors;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
index deef9d8..a7a9c36 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/Scheduler.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -27,12 +27,12 @@ import javax.annotation.Nullable;
 
 /**
  * Only two threads call scheduling code: RuntimeMaster thread (RMT), and SchedulerThread(ST).
- * RMT and ST meet only at two points: SchedulingPolicy, and PendingTaskGroupCollection,
+ * RMT and ST meet only at two points: SchedulingPolicy, and PendingTaskCollection,
  * which are synchronized(ThreadSafe).
  * Other scheduler-related classes that are accessed by only one of the two threads are not synchronized(NotThreadSafe).
  *
  * Receives jobs to execute and schedules
- * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup} to executors.
+ * {@link edu.snu.nemo.runtime.common.plan.physical.ScheduledTask} to executors.
  */
 @DriverSide
 @DefaultImplementation(BatchSingleJobScheduler.class)
@@ -50,7 +50,7 @@ public interface Scheduler {
    * Receives and updates the scheduler with a new physical plan for a job.
    * @param jobId the ID of the job to change the physical plan.
    * @param newPhysicalPlan new physical plan for the job.
-   * @param taskInfo pair containing the information of the executor id and task group id to mark as complete after the
+   * @param taskInfo pair containing the information of the executor id and task id to mark as complete after the
    *                 update.
    */
   void updateJob(String jobId, PhysicalPlan newPhysicalPlan, Pair<String, String> taskInfo);
@@ -68,21 +68,21 @@ public interface Scheduler {
   void onExecutorRemoved(String executorId);
 
   /**
-   * Called when a TaskGroup's execution state changes.
-   * @param executorId of the executor in which the TaskGroup is executing.
-   * @param taskGroupId of the TaskGroup whose state must be updated.
-   * @param newState for the TaskGroup.
-   * @param attemptIdx the number of times this TaskGroup has executed.
+   * Called when a Task's execution state changes.
+   * @param executorId of the executor in which the Task is executing.
+   * @param taskId of the Task whose state must be updated.
+   * @param newState for the Task.
+   * @param attemptIdx the number of times this Task has executed.
    *************** the below parameters are only valid for failures *****************
    * @param taskPutOnHold the ID of task that are put on hold. It is null otherwise.
-   * @param failureCause for which the TaskGroup failed in the case of a recoverable failure.
+   * @param failureCause for which the Task failed in the case of a recoverable failure.
    */
-  void onTaskGroupStateChanged(String executorId,
-                               String taskGroupId,
-                               TaskGroupState.State newState,
-                               int attemptIdx,
-                               @Nullable String taskPutOnHold,
-                               TaskGroupState.RecoverableFailureCause failureCause);
+  void onTaskStateChanged(String executorId,
+                          String taskId,
+                          TaskState.State newState,
+                          int attemptIdx,
+                          @Nullable String taskPutOnHold,
+                          TaskState.RecoverableFailureCause failureCause);
 
   /**
    * To be called when a job should be terminated.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
index 49ca72e..f7db251 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java
@@ -16,9 +16,9 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.common.state.JobState;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
@@ -39,14 +39,14 @@ import javax.annotation.concurrent.NotThreadSafe;
 import javax.inject.Inject;
 
 /**
- * Takes a TaskGroup from the pending queue and schedules it to an executor.
+ * Takes a Task from the pending queue and schedules it to an executor.
  */
 @DriverSide
 @NotThreadSafe
 public final class SchedulerRunner {
   private static final Logger LOG = LoggerFactory.getLogger(SchedulerRunner.class.getName());
   private final Map<String, JobStateManager> jobStateManagers;
-  private final PendingTaskGroupCollection pendingTaskGroupCollection;
+  private final PendingTaskCollection pendingTaskCollection;
   private final ExecutorService schedulerThread;
   private boolean initialJobScheduled;
   private boolean isTerminated;
@@ -58,10 +58,10 @@ public final class SchedulerRunner {
   @VisibleForTesting
   @Inject
   public SchedulerRunner(final SchedulingPolicy schedulingPolicy,
-                         final PendingTaskGroupCollection pendingTaskGroupCollection,
+                         final PendingTaskCollection pendingTaskCollection,
                          final ExecutorRegistry executorRegistry) {
     this.jobStateManagers = new HashMap<>();
-    this.pendingTaskGroupCollection = pendingTaskGroupCollection;
+    this.pendingTaskCollection = pendingTaskCollection;
     this.schedulerThread = Executors.newSingleThreadExecutor(runnable -> new Thread(runnable, "SchedulerRunner"));
     this.initialJobScheduled = false;
     this.isTerminated = false;
@@ -77,9 +77,9 @@ public final class SchedulerRunner {
   }
 
   /**
-   * Signals to the condition on TaskGroup availability.
+   * Signals to the condition on Task availability.
    */
-  public void onATaskGroupAvailable() {
+  public void onATaskAvailable() {
     mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
   }
 
@@ -110,7 +110,7 @@ public final class SchedulerRunner {
   }
 
   /**
-   * A separate thread is run to schedule task groups to executors.
+   * A separate thread is run to schedule tasks to executors.
    */
   private final class SchedulerThread implements Runnable {
     @Override
@@ -122,18 +122,18 @@ public final class SchedulerRunner {
         // Iteration guard
         mustCheckSchedulingAvailabilityOrSchedulerTerminated.await();
 
-        final Collection<ScheduledTaskGroup> schedulableTaskGroups = pendingTaskGroupCollection
-            .peekSchedulableTaskGroups().orElse(null);
-        if (schedulableTaskGroups == null) {
-          // TaskGroup queue is empty
-          LOG.debug("PendingTaskGroupCollection is empty. Awaiting for more TaskGroups...");
+        final Collection<ScheduledTask> schedulableTasks = pendingTaskCollection
+            .peekSchedulableTasks().orElse(null);
+        if (schedulableTasks == null) {
+          // Task queue is empty
+          LOG.debug("PendingTaskCollection is empty. Awaiting for more Tasks...");
           continue;
         }
 
-        int numScheduledTaskGroups = 0;
-        for (final ScheduledTaskGroup schedulableTaskGroup : schedulableTaskGroups) {
-          final JobStateManager jobStateManager = jobStateManagers.get(schedulableTaskGroup.getJobId());
-          LOG.debug("Trying to schedule {}...", schedulableTaskGroup.getTaskGroupId());
+        int numScheduledTasks = 0;
+        for (final ScheduledTask schedulableTask : schedulableTasks) {
+          final JobStateManager jobStateManager = jobStateManagers.get(schedulableTask.getJobId());
+          LOG.debug("Trying to schedule {}...", schedulableTask.getTaskId());
 
           final Set<ExecutorRepresenter> runningExecutorRepresenter =
               executorRegistry.getRunningExecutorIds().stream()
@@ -141,26 +141,26 @@ public final class SchedulerRunner {
               .collect(Collectors.toSet());
 
           final Set<ExecutorRepresenter> candidateExecutors =
-              schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, schedulableTaskGroup);
+              schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, schedulableTask);
 
           if (candidateExecutors.size() != 0) {
-            jobStateManager.onTaskGroupStateChanged(schedulableTaskGroup.getTaskGroupId(),
-                TaskGroupState.State.EXECUTING);
+            jobStateManager.onTaskStateChanged(schedulableTask.getTaskId(),
+                TaskState.State.EXECUTING);
             final ExecutorRepresenter executor = candidateExecutors.stream().findFirst().get();
-            executor.onTaskGroupScheduled(schedulableTaskGroup);
+            executor.onTaskScheduled(schedulableTask);
 
-            pendingTaskGroupCollection.remove(schedulableTaskGroup.getTaskGroupId());
-            numScheduledTaskGroups++;
-            LOG.debug("Successfully scheduled {}", schedulableTaskGroup.getTaskGroupId());
+            pendingTaskCollection.remove(schedulableTask.getTaskId());
+            numScheduledTasks++;
+            LOG.debug("Successfully scheduled {}", schedulableTask.getTaskId());
           } else {
-            LOG.debug("Failed to schedule {}", schedulableTaskGroup.getTaskGroupId());
+            LOG.debug("Failed to schedule {}", schedulableTask.getTaskId());
           }
         }
 
-        LOG.debug("Examined {} TaskGroups, scheduled {} TaskGroups",
-            schedulableTaskGroups.size(), numScheduledTaskGroups);
-        if (schedulableTaskGroups.size() == numScheduledTaskGroups) {
-          // Scheduled all TaskGroups in the stage
+        LOG.debug("Examined {} Tasks, scheduled {} Tasks",
+            schedulableTasks.size(), numScheduledTasks);
+        if (schedulableTasks.size() == numScheduledTasks) {
+          // Scheduled all Tasks in the stage
           // Immediately run next iteration to check whether there is another schedulable stage
           LOG.debug("Trying to schedule next Stage in the ScheduleGroup (if any)...");
           mustCheckSchedulingAvailabilityOrSchedulerTerminated.signal();
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
index b0e39ac..ab9e251 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.tang.annotations.DefaultImplementation;
@@ -32,5 +32,5 @@ import java.util.Set;
 @DefaultImplementation(CompositeSchedulingPolicy.class)
 public interface SchedulingPolicy {
   Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                      final ScheduledTaskGroup scheduledTaskGroup);
+                                                      final ScheduledTask scheduledTask);
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
similarity index 61%
rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
index 7300531..526fbec 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskGroupCollection.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SingleJobTaskCollection.java
@@ -20,7 +20,7 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import net.jcip.annotations.ThreadSafe;
 import org.apache.reef.annotations.audience.DriverSide;
 
@@ -29,103 +29,103 @@ import java.util.*;
 import java.util.concurrent.*;
 
 /**
- * {@link PendingTaskGroupCollection} implementation.
- * This class provides two-level scheduling by keeping track of schedulable stages and stage-TaskGroup membership.
- * {@link #peekSchedulableTaskGroups()} returns collection of TaskGroups which belong to one of the schedulable stages.
+ * {@link PendingTaskCollection} implementation.
+ * This class provides two-level scheduling by keeping track of schedulable stages and stage-Task membership.
+ * {@link #peekSchedulableTasks()} returns collection of Tasks which belong to one of the schedulable stages.
  */
 @ThreadSafe
 @DriverSide
-public final class SingleJobTaskGroupCollection implements PendingTaskGroupCollection {
+public final class SingleJobTaskCollection implements PendingTaskCollection {
   private PhysicalPlan physicalPlan;
 
   /**
-   * Pending TaskGroups awaiting to be scheduled for each stage.
+   * Pending Tasks awaiting to be scheduled for each stage.
    */
-  private final ConcurrentMap<String, Map<String, ScheduledTaskGroup>> stageIdToPendingTaskGroups;
+  private final ConcurrentMap<String, Map<String, ScheduledTask>> stageIdToPendingTasks;
 
   /**
-   * Stages with TaskGroups that have not yet been scheduled.
+   * Stages with Tasks that have not yet been scheduled.
    */
   private final BlockingDeque<String> schedulableStages;
 
   @Inject
-  public SingleJobTaskGroupCollection() {
-    stageIdToPendingTaskGroups = new ConcurrentHashMap<>();
+  public SingleJobTaskCollection() {
+    stageIdToPendingTasks = new ConcurrentHashMap<>();
     schedulableStages = new LinkedBlockingDeque<>();
   }
 
   @Override
-  public synchronized void add(final ScheduledTaskGroup scheduledTaskGroup) {
-    final String stageId = RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId());
-
-    stageIdToPendingTaskGroups.compute(stageId, (s, taskGroupIdToTaskGroup) -> {
-      if (taskGroupIdToTaskGroup == null) {
-        final Map<String, ScheduledTaskGroup> taskGroupIdToTaskGroupMap = new HashMap<>();
-        taskGroupIdToTaskGroupMap.put(scheduledTaskGroup.getTaskGroupId(), scheduledTaskGroup);
-        updateSchedulableStages(stageId, scheduledTaskGroup.getContainerType());
-        return taskGroupIdToTaskGroupMap;
+  public synchronized void add(final ScheduledTask scheduledTask) {
+    final String stageId = RuntimeIdGenerator.getStageIdFromTaskId(scheduledTask.getTaskId());
+
+    stageIdToPendingTasks.compute(stageId, (s, taskIdToTask) -> {
+      if (taskIdToTask == null) {
+        final Map<String, ScheduledTask> taskIdToTaskMap = new HashMap<>();
+        taskIdToTaskMap.put(scheduledTask.getTaskId(), scheduledTask);
+        updateSchedulableStages(stageId, scheduledTask.getContainerType());
+        return taskIdToTaskMap;
       } else {
-        taskGroupIdToTaskGroup.put(scheduledTaskGroup.getTaskGroupId(), scheduledTaskGroup);
-        return taskGroupIdToTaskGroup;
+        taskIdToTask.put(scheduledTask.getTaskId(), scheduledTask);
+        return taskIdToTask;
       }
     });
   }
 
   /**
-   * Removes the specified TaskGroup to be scheduled.
-   * The specified TaskGroup should belong to the collection from {@link #peekSchedulableTaskGroups()}.
-   * @param taskGroupId id of the TaskGroup
-   * @return the specified TaskGroup
-   * @throws NoSuchElementException if the specified TaskGroup is not in the queue,
-   *                                or removing this TaskGroup breaks scheduling order
-   *                                (i.e. does not belong to the collection from {@link #peekSchedulableTaskGroups()}.
+   * Removes the specified Task to be scheduled.
+   * The specified Task should belong to the collection from {@link #peekSchedulableTasks()}.
+   * @param taskId id of the Task
+   * @return the specified Task
+   * @throws NoSuchElementException if the specified Task is not in the queue,
+   *                                or removing this Task breaks scheduling order
+   *                                (i.e. does not belong to the collection from {@link #peekSchedulableTasks()}.
    */
   @Override
-  public synchronized ScheduledTaskGroup remove(final String taskGroupId) throws NoSuchElementException {
+  public synchronized ScheduledTask remove(final String taskId) throws NoSuchElementException {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
-      throw new NoSuchElementException("No schedulable stage in TaskGroup queue");
+      throw new NoSuchElementException("No schedulable stage in Task queue");
     }
 
-    final Map<String, ScheduledTaskGroup> pendingTaskGroupsForStage = stageIdToPendingTaskGroups.get(stageId);
+    final Map<String, ScheduledTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
 
-    if (pendingTaskGroupsForStage == null) {
-      throw new RuntimeException(String.format("Stage %s not found in TaskGroup queue", stageId));
+    if (pendingTasksForStage == null) {
+      throw new RuntimeException(String.format("Stage %s not found in Task queue", stageId));
     }
-    final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupsForStage.remove(taskGroupId);
-    if (taskGroupToSchedule == null) {
-      throw new NoSuchElementException(String.format("TaskGroup %s not found in TaskGroup queue", taskGroupId));
+    final ScheduledTask taskToSchedule = pendingTasksForStage.remove(taskId);
+    if (taskToSchedule == null) {
+      throw new NoSuchElementException(String.format("Task %s not found in Task queue", taskId));
     }
-    if (pendingTaskGroupsForStage.isEmpty()) {
+    if (pendingTasksForStage.isEmpty()) {
       if (!schedulableStages.pollFirst().equals(stageId)) {
         throw new RuntimeException(String.format("Expected stage %s to be polled", stageId));
       }
-      stageIdToPendingTaskGroups.remove(stageId);
-      stageIdToPendingTaskGroups.forEach((scheduledStageId, taskGroups) ->
-          updateSchedulableStages(scheduledStageId, taskGroups.values().iterator().next().getContainerType()));
+      stageIdToPendingTasks.remove(stageId);
+      stageIdToPendingTasks.forEach((scheduledStageId, tasks) ->
+          updateSchedulableStages(scheduledStageId, tasks.values().iterator().next().getContainerType()));
     }
 
-    return taskGroupToSchedule;
+    return taskToSchedule;
   }
 
   /**
-   * Peeks TaskGroups that can be scheduled.
-   * @return TaskGroups to be scheduled, or {@link Optional#empty()} if the queue is empty
-   * @return collection of TaskGroups which belong to one of the schedulable stages
+   * Peeks Tasks that can be scheduled.
+   * @return Tasks to be scheduled, or {@link Optional#empty()} if the queue is empty
+   * @return collection of Tasks which belong to one of the schedulable stages
    *         or {@link Optional#empty} if the queue is empty
    */
   @Override
-  public synchronized Optional<Collection<ScheduledTaskGroup>> peekSchedulableTaskGroups() {
+  public synchronized Optional<Collection<ScheduledTask>> peekSchedulableTasks() {
     final String stageId = schedulableStages.peekFirst();
     if (stageId == null) {
       return Optional.empty();
     }
 
-    final Map<String, ScheduledTaskGroup> pendingTaskGroupsForStage = stageIdToPendingTaskGroups.get(stageId);
-    if (pendingTaskGroupsForStage == null) {
-      throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTaskGroups map", stageId));
+    final Map<String, ScheduledTask> pendingTasksForStage = stageIdToPendingTasks.get(stageId);
+    if (pendingTasksForStage == null) {
+      throw new RuntimeException(String.format("Stage %s not found in stageIdToPendingTasks map", stageId));
     }
-    return Optional.of(new ArrayList<>(pendingTaskGroupsForStage.values()));
+    return Optional.of(new ArrayList<>(pendingTasksForStage.values()));
   }
 
   /**
@@ -133,7 +133,7 @@ public final class SingleJobTaskGroupCollection implements PendingTaskGroupColle
    * @param stageId for the stage to begin the removal recursively.
    */
   @Override
-  public synchronized void removeTaskGroupsAndDescendants(final String stageId) {
+  public synchronized void removeTasksAndDescendants(final String stageId) {
     removeStageAndChildren(stageId);
   }
 
@@ -143,7 +143,7 @@ public final class SingleJobTaskGroupCollection implements PendingTaskGroupColle
    */
   private synchronized void removeStageAndChildren(final String stageId) {
     if (schedulableStages.remove(stageId)) {
-      stageIdToPendingTaskGroups.remove(stageId);
+      stageIdToPendingTasks.remove(stageId);
     }
 
     physicalPlan.getStageDAG().getChildren(stageId).forEach(
@@ -211,6 +211,6 @@ public final class SingleJobTaskGroupCollection implements PendingTaskGroupColle
   @Override
   public synchronized void close() {
     schedulableStages.clear();
-    stageIdToPendingTaskGroups.clear();
+    stageIdToPendingTasks.clear();
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
index c5eee7f..5432f0b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java
@@ -17,7 +17,7 @@ package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.Readable;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
@@ -29,7 +29,7 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 /**
- * This policy is same as {@link RoundRobinSchedulingPolicy}, however for TaskGroups
+ * This policy is same as {@link RoundRobinSchedulingPolicy}, however for Tasks
  * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors
  * where the corresponding data resides.
  */
@@ -45,7 +45,7 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
 
   /**
    * @param readables collection of readables
-   * @return Set of source locations from source tasks in {@code taskGroupDAG}
+   * @return Set of source locations from source tasks in {@code taskDAG}
    * @throws Exception for any exception raised during querying source locations for a readable
    */
   private static Set<String> getSourceLocations(final Collection<Readable> readables) throws Exception {
@@ -59,15 +59,15 @@ public final class SourceLocationAwareSchedulingPolicy implements SchedulingPoli
   /**
    * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location.
    *                               If there is no source locations, will return original set.
-   * @param scheduledTaskGroup {@link ScheduledTaskGroup} to be scheduled.
+   * @param scheduledTask {@link ScheduledTask} to be scheduled.
    * @return filtered Set of {@link ExecutorRepresenter}.
    */
   @Override
   public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet,
-                                                             final ScheduledTaskGroup scheduledTaskGroup) {
+                                                             final ScheduledTask scheduledTask) {
     final Set<String> sourceLocations;
     try {
-      sourceLocations = getSourceLocations(scheduledTaskGroup.getLogicalTaskIdToReadable().values());
+      sourceLocations = getSourceLocations(scheduledTask.getLogicalTaskIdToReadable().values());
     } catch (final UnsupportedOperationException e) {
       return executorRepresenterSet;
     } catch (final Exception e) {
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
index 585cdda..47c052f 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java
@@ -65,7 +65,7 @@ public final class BatchSingleJobSchedulerTest {
   private SchedulerRunner schedulerRunner;
   private ExecutorRegistry executorRegistry;
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskGroupCollection pendingTaskGroupCollection;
+  private PendingTaskCollection pendingTaskCollection;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class);
@@ -73,7 +73,7 @@ public final class BatchSingleJobSchedulerTest {
 
   private static final int EXECUTOR_CAPACITY = 20;
 
-  // This schedule index will make sure that task group events are not ignored
+  // This schedule index will make sure that task events are not ignored
   private static final int MAGIC_SCHEDULE_ATTEMPT_INDEX = Integer.MAX_VALUE;
 
   @Before
@@ -83,13 +83,13 @@ public final class BatchSingleJobSchedulerTest {
 
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
     metricMessageHandler = mock(MetricMessageHandler.class);
-    pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
+    pendingTaskCollection = new SingleJobTaskCollection();
     schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-    schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupCollection, executorRegistry);
+    schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollection, executorRegistry);
     pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
     scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskGroupCollection,
+        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskCollection,
             blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
 
     final ActiveContext activeContext = mock(ActiveContext.class);
@@ -125,7 +125,7 @@ public final class BatchSingleJobSchedulerTest {
 
   /**
    * This method builds a physical DAG starting from an IR DAG and submits it to {@link BatchSingleJobScheduler}.
-   * TaskGroup state changes are explicitly submitted to scheduler instead of executor messages.
+   * Task state changes are explicitly submitted to scheduler instead of executor messages.
    */
   @Test(timeout=10000)
   public void testPull() throws Exception {
@@ -135,7 +135,7 @@ public final class BatchSingleJobSchedulerTest {
 
   /**
    * This method builds a physical DAG starting from an IR DAG and submits it to {@link BatchSingleJobScheduler}.
-   * TaskGroup state changes are explicitly submitted to scheduler instead of executor messages.
+   * Task state changes are explicitly submitted to scheduler instead of executor messages.
    */
   @Test(timeout=10000)
   public void testPush() throws Exception {
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
index b95c2a8..07d36a2 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java
@@ -16,7 +16,7 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.scheduler.ContainerTypeAwareSchedulingPolicy;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
@@ -34,7 +34,7 @@ import static org.mockito.Mockito.*;
  * Tests {@link ContainerTypeAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class})
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
 public final class ContainerTypeAwareSchedulingPolicyTest {
 
   private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) {
@@ -50,24 +50,24 @@ public final class ContainerTypeAwareSchedulingPolicyTest {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE);
 
-    final ScheduledTaskGroup scheduledTaskGroup1 = mock(ScheduledTaskGroup.class);
-    when(scheduledTaskGroup1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
+    final ScheduledTask scheduledTask1 = mock(ScheduledTask.class);
+    when(scheduledTask1.getContainerType()).thenReturn(ExecutorPlacementProperty.RESERVED);
 
     final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors1 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, scheduledTaskGroup1);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, scheduledTask1);
 
     final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors1, candidateExecutors1);
 
-    final ScheduledTaskGroup scheduledTaskGroup2 = mock(ScheduledTaskGroup.class);
-    when(scheduledTaskGroup2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
+    final ScheduledTask scheduledTask2 = mock(ScheduledTask.class);
+    when(scheduledTask2.getContainerType()).thenReturn(ExecutorPlacementProperty.NONE);
 
     final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors2 =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, scheduledTaskGroup2);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, scheduledTask2);
 
     final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2));
     assertEquals(expectedExecutors2, candidateExecutors2);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
index bd17124..d2ee903 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java
@@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageSender;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.MetricMessageHandler;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
@@ -71,7 +71,7 @@ public final class FaultToleranceTest {
   private ExecutorRegistry executorRegistry;
 
   private MetricMessageHandler metricMessageHandler;
-  private PendingTaskGroupCollection pendingTaskGroupCollection;
+  private PendingTaskCollection pendingTaskCollection;
   private PubSubEventHandlerWrapper pubSubEventHandler;
   private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler;
   private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class);
@@ -95,16 +95,16 @@ public final class FaultToleranceTest {
     final Injector injector = Tang.Factory.getTang().newInjector();
     executorRegistry = injector.getInstance(ExecutorRegistry.class);
 
-    pendingTaskGroupCollection = new SingleJobTaskGroupCollection();
+    pendingTaskCollection = new SingleJobTaskCollection();
     schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
 
     if (useMockSchedulerRunner) {
       schedulerRunner = mock(SchedulerRunner.class);
     } else {
-      schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskGroupCollection, executorRegistry);
+      schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollection, executorRegistry);
     }
     scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskGroupCollection,
+        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, pendingTaskCollection,
             blockManagerMaster, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
 
     // Add nodes
@@ -146,20 +146,20 @@ public final class FaultToleranceTest {
     for (final PhysicalStage stage : dagOf4Stages) {
       if (stage.getScheduleGroupIndex() == 0) {
 
-        // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0.
-        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskGroupCollection.isEmpty());
-        stage.getTaskGroupIds().forEach(taskGroupId ->
-            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
-                taskGroupId, TaskGroupState.State.COMPLETE, 1));
+        assertTrue(pendingTaskCollection.isEmpty());
+        stage.getTaskIds().forEach(taskId ->
+            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
+                taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
         scheduler.onExecutorRemoved("a3");
-        // There are 2 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
-        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+        // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
-        // Due to round robin scheduling, "a2" is assured to have a running TaskGroup.
+        // Due to round robin scheduling, "a2" is assured to have a running Task.
         scheduler.onExecutorRemoved("a2");
 
         while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState() != EXECUTING) {
@@ -167,16 +167,16 @@ public final class FaultToleranceTest {
         }
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 2);
 
-        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskGroupCollection.isEmpty());
-        stage.getTaskGroupIds().forEach(taskGroupId ->
-            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
-                taskGroupId, TaskGroupState.State.COMPLETE, 1));
+        assertTrue(pendingTaskCollection.isEmpty());
+        stage.getTaskIds().forEach(taskId ->
+            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
+                taskId, TaskState.State.COMPLETE, 1));
       } else {
-        // There are 1 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 2.
-        // Schedule only the first TaskGroup
-        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+        // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2.
+        // Schedule only the first Task
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, true);
       }
     }
@@ -214,32 +214,32 @@ public final class FaultToleranceTest {
     for (final PhysicalStage stage : dagOf4Stages) {
       if (stage.getScheduleGroupIndex() == 0) {
 
-        // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0.
-        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskGroupCollection.isEmpty());
-        stage.getTaskGroupIds().forEach(taskGroupId ->
-            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
-                taskGroupId, TaskGroupState.State.COMPLETE, 1));
+        assertTrue(pendingTaskCollection.isEmpty());
+        stage.getTaskIds().forEach(taskId ->
+            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
+                taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
-        // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
-        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskGroupCollection.isEmpty());
-        stage.getTaskGroupIds().forEach(taskGroupId ->
-            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
-                taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
-                TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
+        assertTrue(pendingTaskCollection.isEmpty());
+        stage.getTaskIds().forEach(taskId ->
+            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
+                taskId, TaskState.State.FAILED_RECOVERABLE, 1,
+                TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE));
 
         while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState() != EXECUTING) {
 
         }
 
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 3);
-        assertFalse(pendingTaskGroupCollection.isEmpty());
-        stage.getTaskGroupIds().forEach(taskGroupId -> {
-          assertEquals(jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(),
-              TaskGroupState.State.READY);
+        assertFalse(pendingTaskCollection.isEmpty());
+        stage.getTaskIds().forEach(taskId -> {
+          assertEquals(jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState(),
+              TaskState.State.READY);
         });
       }
     }
@@ -277,41 +277,41 @@ public final class FaultToleranceTest {
     for (final PhysicalStage stage : dagOf4Stages) {
       if (stage.getScheduleGroupIndex() == 0) {
 
-        // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0.
-        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+        // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0.
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
-        assertTrue(pendingTaskGroupCollection.isEmpty());
-        stage.getTaskGroupIds().forEach(taskGroupId ->
-            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
-                taskGroupId, TaskGroupState.State.COMPLETE, 1));
+        assertTrue(pendingTaskCollection.isEmpty());
+        stage.getTaskIds().forEach(taskId ->
+            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
+                taskId, TaskState.State.COMPLETE, 1));
       } else if (stage.getScheduleGroupIndex() == 1) {
-        // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1.
-        SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager,
+        // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 1.
+        SchedulerTestUtil.mockSchedulerRunner(pendingTaskCollection, schedulingPolicy, jobStateManager,
             executorRegistry, false);
 
-        stage.getTaskGroupIds().forEach(taskGroupId ->
-            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
-                taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1,
-                TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE));
+        stage.getTaskIds().forEach(taskId ->
+            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
+                taskId, TaskState.State.FAILED_RECOVERABLE, 1,
+                TaskState.RecoverableFailureCause.INPUT_READ_FAILURE));
 
         while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState() != EXECUTING) {
 
         }
 
         assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 2);
-        stage.getTaskGroupIds().forEach(taskGroupId -> {
-          assertEquals(jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(),
-              TaskGroupState.State.READY);
+        stage.getTaskIds().forEach(taskId -> {
+          assertEquals(jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState(),
+              TaskState.State.READY);
         });
       }
     }
   }
 
   /**
-   * Tests the rescheduling of TaskGroups upon a failure.
+   * Tests the rescheduling of Tasks upon a failure.
    */
   @Test(timeout=10000)
-  public void testTaskGroupReexecutionForFailure() throws Exception {
+  public void testTaskReexecutionForFailure() throws Exception {
     final ActiveContext activeContext = mock(ActiveContext.class);
     Mockito.doThrow(new RuntimeException()).when(activeContext).close();
 
@@ -343,16 +343,16 @@ public final class FaultToleranceTest {
 
     for (final PhysicalStage stage : dagOf4Stages) {
       while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState() != COMPLETE) {
-        final Set<String> a1RunningTaskGroups = new HashSet<>(a1.getRunningTaskGroups());
-        final Set<String> a3RunningTaskGroups = new HashSet<>(a3.getRunningTaskGroups());
+        final Set<String> a1RunningTasks = new HashSet<>(a1.getRunningTasks());
+        final Set<String> a3RunningTasks = new HashSet<>(a3.getRunningTasks());
 
-        a1RunningTaskGroups.forEach(taskGroupId ->
-            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
-                taskGroupId, TaskGroupState.State.COMPLETE, 1));
+        a1RunningTasks.forEach(taskId ->
+            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
+                taskId, TaskState.State.COMPLETE, 1));
 
-        a3RunningTaskGroups.forEach(taskGroupId ->
-            SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry,
-                taskGroupId, TaskGroupState.State.COMPLETE, 1));
+        a3RunningTasks.forEach(taskId ->
+            SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry,
+                taskId, TaskState.State.COMPLETE, 1));
       }
     }
     assertTrue(jobStateManager.checkJobTermination());
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
index 162a140..cb100f0 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.scheduler.FreeSlotSchedulingPolicy;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
@@ -34,15 +34,15 @@ import static org.mockito.Mockito.*;
  * Tests {@link FreeSlotSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class})
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
 public final class FreeSlotSchedulingPolicyTest {
 
-  private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTaskGroups,
+  private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks,
                                                              final int capacity) {
     final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
-    final Set<String> runningTaskGroups = new HashSet<>();
-    IntStream.range(0, numRunningTaskGroups).forEach(i -> runningTaskGroups.add(String.valueOf(i)));
-    when(executorRepresenter.getRunningTaskGroups()).thenReturn(runningTaskGroups);
+    final Set<String> runningTasks = new HashSet<>();
+    IntStream.range(0, numRunningTasks).forEach(i -> runningTasks.add(String.valueOf(i)));
+    when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
     when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
     return executorRepresenter;
   }
@@ -53,12 +53,12 @@ public final class FreeSlotSchedulingPolicyTest {
     final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
 
-    final ScheduledTaskGroup scheduledTaskGroup = mock(ScheduledTaskGroup.class);
+    final ScheduledTask scheduledTask = mock(ScheduledTask.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTaskGroup);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTask);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
index 04d3075..10cced0 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -32,14 +32,14 @@ import static org.mockito.Mockito.*;
  * Tests {@link RoundRobinSchedulingPolicy}
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class})
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class})
 public final class RoundRobinSchedulingPolicyTest {
 
-  private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTaskGroups) {
+  private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks) {
     final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class);
-    final Set<String> runningTaskGroups = new HashSet<>();
-    IntStream.range(0, numRunningTaskGroups).forEach(i -> runningTaskGroups.add(String.valueOf(i)));
-    when(executorRepresenter.getRunningTaskGroups()).thenReturn(runningTaskGroups);
+    final Set<String> runningTasks = new HashSet<>();
+    IntStream.range(0, numRunningTasks).forEach(i -> runningTasks.add(String.valueOf(i)));
+    when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
     return executorRepresenter;
   }
 
@@ -50,12 +50,12 @@ public final class RoundRobinSchedulingPolicyTest {
     final ExecutorRepresenter a1 = mockExecutorRepresenter(2);
     final ExecutorRepresenter a2 = mockExecutorRepresenter(2);
 
-    final ScheduledTaskGroup scheduledTaskGroup = mock(ScheduledTaskGroup.class);
+    final ScheduledTask scheduledTask = mock(ScheduledTask.class);
 
     final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2));
 
     final Set<ExecutorRepresenter> candidateExecutors =
-        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTaskGroup);
+        schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, scheduledTask);
 
     final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0));
     assertEquals(expectedExecutors, candidateExecutors);
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
index e3b1abb..36b489a 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java
@@ -16,9 +16,9 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage;
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.runtime.common.state.StageState;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.runtime.master.JobStateManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 
@@ -30,7 +30,7 @@ import java.util.stream.Collectors;
  */
 public final class SchedulerTestUtil {
   /**
-   * Complete the stage by completing all of its TaskGroups.
+   * Complete the stage by completing all of its Tasks.
    * @param jobStateManager for the submitted job.
    * @param scheduler for the submitted job.
    * @param executorRegistry provides executor representers
@@ -48,12 +48,12 @@ public final class SchedulerTestUtil {
         // Stage has completed, so we break out of the loop.
         break;
       } else if (StageState.State.EXECUTING == stageState) {
-        physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-          final Enum tgState = jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState();
-          if (TaskGroupState.State.EXECUTING == tgState) {
-            sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId,
-                TaskGroupState.State.COMPLETE, attemptIdx, null);
-          } else if (TaskGroupState.State.READY == tgState || TaskGroupState.State.COMPLETE == tgState) {
+        physicalStage.getTaskIds().forEach(taskId -> {
+          final Enum tgState = jobStateManager.getTaskState(taskId).getStateMachine().getCurrentState();
+          if (TaskState.State.EXECUTING == tgState) {
+            sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId,
+                TaskState.State.COMPLETE, attemptIdx, null);
+          } else if (TaskState.State.READY == tgState || TaskState.State.COMPLETE == tgState) {
             // Skip READY (try in the next loop and see if it becomes EXECUTING) and COMPLETE.
           } else {
             throw new IllegalStateException(tgState.toString());
@@ -68,60 +68,60 @@ public final class SchedulerTestUtil {
   }
 
   /**
-   * Sends task group state change event to scheduler.
-   * This replaces executor's task group completion messages for testing purposes.
+   * Sends task state change event to scheduler.
+   * This replaces executor's task completion messages for testing purposes.
    * @param scheduler for the submitted job.
    * @param executorRegistry provides executor representers
-   * @param taskGroupId for the task group to change the state.
-   * @param newState for the task group.
+   * @param taskId for the task to change the state.
+   * @param newState for the task.
    * @param cause in the case of a recoverable failure.
    */
-  public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler,
-                                                        final ExecutorRegistry executorRegistry,
-                                                        final String taskGroupId,
-                                                        final TaskGroupState.State newState,
-                                                        final int attemptIdx,
-                                                        final TaskGroupState.RecoverableFailureCause cause) {
+  public static void sendTaskStateEventToScheduler(final Scheduler scheduler,
+                                                   final ExecutorRegistry executorRegistry,
+                                                   final String taskId,
+                                                   final TaskState.State newState,
+                                                   final int attemptIdx,
+                                                   final TaskState.RecoverableFailureCause cause) {
     ExecutorRepresenter scheduledExecutor;
     do {
-      scheduledExecutor = findExecutorForTaskGroup(executorRegistry, taskGroupId);
+      scheduledExecutor = findExecutorForTask(executorRegistry, taskId);
     } while (scheduledExecutor == null);
 
-    scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), taskGroupId,
+    scheduler.onTaskStateChanged(scheduledExecutor.getExecutorId(), taskId,
         newState, attemptIdx, null, cause);
   }
 
-  public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler,
-                                                        final ExecutorRegistry executorRegistry,
-                                                        final String taskGroupId,
-                                                        final TaskGroupState.State newState,
-                                                        final int attemptIdx) {
-    sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, newState, attemptIdx, null);
+  public static void sendTaskStateEventToScheduler(final Scheduler scheduler,
+                                                   final ExecutorRegistry executorRegistry,
+                                                   final String taskId,
+                                                   final TaskState.State newState,
+                                                   final int attemptIdx) {
+    sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId, newState, attemptIdx, null);
   }
 
-  public static void mockSchedulerRunner(final PendingTaskGroupCollection pendingTaskGroupCollection,
+  public static void mockSchedulerRunner(final PendingTaskCollection pendingTaskCollection,
                                          final SchedulingPolicy schedulingPolicy,
                                          final JobStateManager jobStateManager,
                                          final ExecutorRegistry executorRegistry,
                                          final boolean isPartialSchedule) {
-    while (!pendingTaskGroupCollection.isEmpty()) {
-      final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupCollection.remove(
-          pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId());
+    while (!pendingTaskCollection.isEmpty()) {
+      final ScheduledTask taskToSchedule = pendingTaskCollection.remove(
+          pendingTaskCollection.peekSchedulableTasks().get().iterator().next().getTaskId());
 
       final Set<ExecutorRepresenter> runningExecutorRepresenter =
           executorRegistry.getRunningExecutorIds().stream()
               .map(executorId -> executorRegistry.getExecutorRepresenter(executorId))
               .collect(Collectors.toSet());
       final Set<ExecutorRepresenter> candidateExecutors =
-          schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, taskGroupToSchedule);
+          schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, taskToSchedule);
       if (candidateExecutors.size() > 0) {
-        jobStateManager.onTaskGroupStateChanged(taskGroupToSchedule.getTaskGroupId(),
-            TaskGroupState.State.EXECUTING);
+        jobStateManager.onTaskStateChanged(taskToSchedule.getTaskId(),
+            TaskState.State.EXECUTING);
         final ExecutorRepresenter executor = candidateExecutors.stream().findFirst().get();
-        executor.onTaskGroupScheduled(taskGroupToSchedule);
+        executor.onTaskScheduled(taskToSchedule);
       }
 
-      // Schedule only the first task group.
+      // Schedule only the first task.
       if (isPartialSchedule) {
         break;
       }
@@ -129,17 +129,17 @@ public final class SchedulerTestUtil {
   }
 
   /**
-   * Retrieves the executor to which the given task group was scheduled.
-   * @param taskGroupId of the task group to search.
+   * Retrieves the executor to which the given task was scheduled.
+   * @param taskId of the task to search.
    * @param executorRegistry provides executor representers
-   * @return the {@link ExecutorRepresenter} of the executor the task group was scheduled to.
+   * @return the {@link ExecutorRepresenter} of the executor the task was scheduled to.
    */
-  private static ExecutorRepresenter findExecutorForTaskGroup(final ExecutorRegistry executorRegistry,
-                                                              final String taskGroupId) {
+  private static ExecutorRepresenter findExecutorForTask(final ExecutorRegistry executorRegistry,
+                                                         final String taskId) {
     for (final String executorId : executorRegistry.getRunningExecutorIds()) {
       final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId);
-      if (executor.getRunningTaskGroups().contains(taskGroupId)
-          || executor.getCompleteTaskGroups().contains(taskGroupId)) {
+      if (executor.getRunningTasks().contains(taskId)
+          || executor.getCompleteTasks().contains(taskId)) {
         return executor;
       }
     }
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
similarity index 64%
rename from runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java
rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
index 2e86dbb..b7b7a2e 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskQueueTest.java
@@ -33,32 +33,32 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Tests {@link SingleJobTaskGroupCollection}.
+ * Tests {@link SingleJobTaskCollection}.
  */
-public final class SingleTaskGroupQueueTest {
-  private SingleJobTaskGroupCollection pendingTaskGroupPriorityQueue;
+public final class SingleTaskQueueTest {
+  private SingleJobTaskCollection pendingTaskPriorityQueue;
 
   /**
-   * To be used for a thread pool to execute task groups.
+   * To be used for a thread pool to execute tasks.
    */
   private ExecutorService executorService;
 
   @Before
   public void setUp() throws Exception{
-    pendingTaskGroupPriorityQueue = new SingleJobTaskGroupCollection();
+    pendingTaskPriorityQueue = new SingleJobTaskCollection();
     executorService = Executors.newFixedThreadPool(2);
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupCollection}.
-   * Tests whether the dequeued TaskGroups are according to the stage-dependency priority.
+   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskCollection}.
+   * Tests whether the dequeued Tasks are according to the stage-dependency priority.
    */
   @Test
   public void testPushPriority() throws Exception {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, true);
 
-    pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan);
+    pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
     final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
@@ -70,10 +70,10 @@ public final class SingleTaskGroupQueueTest {
 
     // This mimics Batch Scheduler's behavior
     executorService.submit(() -> {
-      // First schedule the children TaskGroups (since it is push).
-      // BatchSingleJobScheduler will schedule TaskGroups in this order as well.
+      // First schedule the children Tasks (since it is push).
+      // BatchSingleJobScheduler will schedule Tasks in this order as well.
       scheduleStage(dagOf2Stages.get(1));
-      // Then, schedule the parent TaskGroups.
+      // Then, schedule the parent Tasks.
       scheduleStage(dagOf2Stages.get(0));
 
       countDownLatch.countDown();
@@ -83,15 +83,15 @@ public final class SingleTaskGroupQueueTest {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
-        final ScheduledTaskGroup dequeuedTaskGroup = dequeue();
-        assertEquals(RuntimeIdGenerator.getStageIdFromTaskGroupId(dequeuedTaskGroup.getTaskGroupId()),
+        final ScheduledTask dequeuedTask = dequeue();
+        assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(1).getId());
 
-        // Let's say we fail to schedule, and add this TaskGroup back.
-        pendingTaskGroupPriorityQueue.add(dequeuedTaskGroup);
+        // Let's say we fail to schedule, and add this Task back.
+        pendingTaskPriorityQueue.add(dequeuedTask);
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(1).getId());
 
-        // Now that we've dequeued all of the children TaskGroups, we should now start getting the parents.
+        // Now that we've dequeued all of the children Tasks, we should now start getting the parents.
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
       } catch (Exception e) {
@@ -107,14 +107,14 @@ public final class SingleTaskGroupQueueTest {
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupCollection}.
-   * Tests whether the dequeued TaskGroups are according to the stage-dependency priority.
+   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskCollection}.
+   * Tests whether the dequeued Tasks are according to the stage-dependency priority.
    */
   @Test
   public void testPullPriority() throws Exception {
     final PhysicalPlan physicalPlan =
         TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, false);
-    pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan);
+    pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
     final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
@@ -125,8 +125,8 @@ public final class SingleTaskGroupQueueTest {
 
     // This mimics Batch Scheduler's behavior
     executorService.submit(() -> {
-      // First schedule the parent TaskGroups (since it is pull).
-      // BatchSingleJobScheduler will schedule TaskGroups in this order as well.
+      // First schedule the parent Tasks (since it is pull).
+      // BatchSingleJobScheduler will schedule Tasks in this order as well.
       scheduleStage(dagOf2Stages.get(0));
       countDownLatch.countDown();
     }).get();
@@ -135,18 +135,18 @@ public final class SingleTaskGroupQueueTest {
     executorService.submit(() -> {
       try {
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
-        final ScheduledTaskGroup dequeuedTaskGroup = dequeue();
-        assertEquals(RuntimeIdGenerator.getStageIdFromTaskGroupId(dequeuedTaskGroup.getTaskGroupId()),
+        final ScheduledTask dequeuedTask = dequeue();
+        assertEquals(RuntimeIdGenerator.getStageIdFromTaskId(dequeuedTask.getTaskId()),
             dagOf2Stages.get(0).getId());
 
-        // Let's say we fail to schedule, and add this TaskGroup back.
-        pendingTaskGroupPriorityQueue.add(dequeuedTaskGroup);
+        // Let's say we fail to schedule, and add this Task back.
+        pendingTaskPriorityQueue.add(dequeuedTask);
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
 
-        // Now that we've dequeued all of the children TaskGroups, we should now schedule children.
+        // Now that we've dequeued all of the children Tasks, we should now schedule children.
         assertEquals(dequeueAndGetStageId(), dagOf2Stages.get(0).getId());
 
-        // Schedule the children TaskGroups.
+        // Schedule the children Tasks.
         scheduleStage(dagOf2Stages.get(1));
       } catch (Exception e) {
         e.printStackTrace();
@@ -159,15 +159,15 @@ public final class SingleTaskGroupQueueTest {
   }
 
   /**
-   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskGroupCollection}.
-   * Tests whether the dequeued TaskGroups are according to the stage-dependency priority,
-   * while concurrently scheduling TaskGroups that have dependencies, but are of different container types.
+   * This method builds a physical DAG starting from an IR DAG and submits it to {@link SingleJobTaskCollection}.
+   * Tests whether the dequeued Tasks are according to the stage-dependency priority,
+   * while concurrently scheduling Tasks that have dependencies, but are of different container types.
    */
   @Test
   public void testWithDifferentContainerType() throws Exception {
     final PhysicalPlan physicalPlan = TestPlanGenerator.generatePhysicalPlan(
         TestPlanGenerator.PlanType.ThreeSequentialVerticesWithDifferentContainerTypes, true);
-    pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan);
+    pendingTaskPriorityQueue.onJobScheduled(physicalPlan);
     final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort();
 
     // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements.
@@ -175,10 +175,10 @@ public final class SingleTaskGroupQueueTest {
 
     final CountDownLatch countDownLatch = new CountDownLatch(2);
 
-    // First schedule the children TaskGroups (since it is push).
-    // BatchSingleJobScheduler will schedule TaskGroups in this order as well.
+    // First schedule the children Tasks (since it is push).
+    // BatchSingleJobScheduler will schedule Tasks in this order as well.
     scheduleStage(dagOf2Stages.get(1));
-    // Then, schedule the parent TaskGroups.
+    // Then, schedule the parent Tasks.
     scheduleStage(dagOf2Stages.get(0));
 
     countDownLatch.countDown();
@@ -206,32 +206,32 @@ public final class SingleTaskGroupQueueTest {
   }
 
   /**
-   * Schedule the task groups in a physical stage.
+   * Schedule the tasks in a physical stage.
    * @param stage the stage to schedule.
    */
   private void scheduleStage(final PhysicalStage stage) {
-    stage.getTaskGroupIds().forEach(taskGroupId ->
-        pendingTaskGroupPriorityQueue.add(new ScheduledTaskGroup(
-            "TestPlan", stage.getSerializedTaskGroupDag(), taskGroupId, Collections.emptyList(),
+    stage.getTaskIds().forEach(taskId ->
+        pendingTaskPriorityQueue.add(new ScheduledTask(
+            "TestPlan", stage.getSerializedTaskDag(), taskId, Collections.emptyList(),
             Collections.emptyList(), 0, stage.getContainerType(), Collections.emptyMap())));
   }
 
   /**
-   * Dequeues a scheduled task group from the task group priority queue and get it's stage name.
-   * @return the stage name of the dequeued task group.
+   * Dequeues a scheduled task from the task priority queue and get it's stage name.
+   * @return the stage name of the dequeued task.
    */
   private String dequeueAndGetStageId() {
-    final ScheduledTaskGroup scheduledTaskGroup = dequeue();
-    return RuntimeIdGenerator.getStageIdFromTaskGroupId(scheduledTaskGroup.getTaskGroupId());
+    final ScheduledTask scheduledTask = dequeue();
+    return RuntimeIdGenerator.getStageIdFromTaskId(scheduledTask.getTaskId());
   }
 
   /**
-   * Dequeues a scheduled task group from the task group priority queue.
-   * @return the TaskGroup dequeued
+   * Dequeues a scheduled task from the task priority queue.
+   * @return the Task dequeued
    */
-  private ScheduledTaskGroup dequeue() {
-    final Collection<ScheduledTaskGroup> scheduledTaskGroups
-        = pendingTaskGroupPriorityQueue.peekSchedulableTaskGroups().get();
-    return pendingTaskGroupPriorityQueue.remove(scheduledTaskGroups.iterator().next().getTaskGroupId());
+  private ScheduledTask dequeue() {
+    final Collection<ScheduledTask> scheduledTasks
+        = pendingTaskPriorityQueue.peekSchedulableTasks().get();
+    return pendingTaskPriorityQueue.remove(scheduledTasks.iterator().next().getTaskId());
   }
 }
diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
index 6d5fc43..87aa123 100644
--- a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
+++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java
@@ -15,7 +15,7 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup;
+import edu.snu.nemo.runtime.common.plan.physical.ScheduledTask;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.junit.Test;
@@ -33,7 +33,7 @@ import static org.mockito.Mockito.*;
  * Test cases for {@link SourceLocationAwareSchedulingPolicy}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, ScheduledTaskGroup.class, Readable.class})
+@PrepareForTest({ExecutorRepresenter.class, ScheduledTask.class, Readable.class})
 public final class SourceLocationAwareSchedulingPolicyTest {
   private static final String SITE_0 = "SEOUL";
   private static final String SITE_1 = "JINJU";
@@ -46,7 +46,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
   }
 
   /**
-   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ScheduledTaskGroup} when
+   * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link ScheduledTask} when
    * there are no executors in appropriate location(s).
    */
   @Test
@@ -54,13 +54,13 @@ public final class SourceLocationAwareSchedulingPolicyTest {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
 
     // Prepare test scenario
-    final ScheduledTaskGroup tg = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+    final ScheduledTask task = CreateScheduledTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_0)));
     final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1);
     final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1);
 
     assertEquals(Collections.emptySet(),
-        schedulingPolicy.filterExecutorRepresenters(new HashSet<>(Arrays.asList(e0, e1)), tg));
+        schedulingPolicy.filterExecutorRepresenters(new HashSet<>(Arrays.asList(e0, e1)), task));
   }
 
   /**
@@ -70,43 +70,43 @@ public final class SourceLocationAwareSchedulingPolicyTest {
   public void testSourceLocationAwareSchedulingWithMultiSource() {
     final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy();
     // Prepare test scenario
-    final ScheduledTaskGroup tg0 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+    final ScheduledTask task0 = CreateScheduledTask.withReadablesWithSourceLocations(
         Collections.singletonList(Collections.singletonList(SITE_1)));
-    final ScheduledTaskGroup tg1 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+    final ScheduledTask task1 = CreateScheduledTask.withReadablesWithSourceLocations(
         Collections.singletonList(Arrays.asList(SITE_0, SITE_1, SITE_2)));
-    final ScheduledTaskGroup tg2 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+    final ScheduledTask task2 = CreateScheduledTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_0), Collections.singletonList(SITE_1),
             Arrays.asList(SITE_1, SITE_2)));
-    final ScheduledTaskGroup tg3 = CreateScheduledTaskGroup.withReadablesWithSourceLocations(
+    final ScheduledTask task3 = CreateScheduledTask.withReadablesWithSourceLocations(
         Arrays.asList(Collections.singletonList(SITE_1), Collections.singletonList(SITE_0),
             Arrays.asList(SITE_0, SITE_2)));
 
     final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1);
-    for (final ScheduledTaskGroup tg : new HashSet<>(Arrays.asList(tg0, tg1, tg2, tg3))) {
+    for (final ScheduledTask task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) {
       assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters(
-          new HashSet<>(Collections.singletonList(e)), tg));
+          new HashSet<>(Collections.singletonList(e)), task));
     }
   }
 
 
   /**
-   * Utility for creating {@link ScheduledTaskGroup}.
+   * Utility for creating {@link ScheduledTask}.
    */
-  private static final class CreateScheduledTaskGroup {
-    private static final AtomicInteger taskGroupIndex = new AtomicInteger(0);
+  private static final class CreateScheduledTask {
     private static final AtomicInteger taskIndex = new AtomicInteger(0);
+    private static final AtomicInteger intraTaskIndex = new AtomicInteger(0);
 
-    private static ScheduledTaskGroup doCreate(final Collection<Readable> readables) {
-      final ScheduledTaskGroup mockInstance = mock(ScheduledTaskGroup.class);
+    private static ScheduledTask doCreate(final Collection<Readable> readables) {
+      final ScheduledTask mockInstance = mock(ScheduledTask.class);
       final Map<String, Readable> readableMap = new HashMap<>();
-      readables.forEach(readable -> readableMap.put(String.format("TASK-%d", taskIndex.getAndIncrement()),
+      readables.forEach(readable -> readableMap.put(String.format("TASK-%d", intraTaskIndex.getAndIncrement()),
           readable));
-      when(mockInstance.getTaskGroupId()).thenReturn(String.format("TG-%d", taskGroupIndex.getAndIncrement()));
+      when(mockInstance.getTaskId()).thenReturn(String.format("T-%d", taskIndex.getAndIncrement()));
       when(mockInstance.getLogicalTaskIdToReadable()).thenReturn(readableMap);
       return mockInstance;
     }
 
-    static ScheduledTaskGroup withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
+    static ScheduledTask withReadablesWithSourceLocations(final Collection<List<String>> sourceLocation) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (final List<String> locations : sourceLocation) {
@@ -120,7 +120,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       }
     }
 
-    static ScheduledTaskGroup withReadablesWithoutSourceLocations(final int numReadables) {
+    static ScheduledTask withReadablesWithoutSourceLocations(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -134,7 +134,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       }
     }
 
-    static ScheduledTaskGroup withReadablesWhichThrowException(final int numReadables) {
+    static ScheduledTask withReadablesWhichThrowException(final int numReadables) {
       try {
         final List<Readable> readables = new ArrayList<>();
         for (int i = 0; i < numReadables; i++) {
@@ -148,7 +148,7 @@ public final class SourceLocationAwareSchedulingPolicyTest {
       }
     }
 
-    static ScheduledTaskGroup withoutReadables() {
+    static ScheduledTask withoutReadables() {
       return doCreate(Collections.emptyList());
     }
   }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index 2c251fd..68f9b12 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -79,7 +79,7 @@ public final class NemoBackendTest<I, O> {
     final PhysicalPlan executionPlan = backend.compile(dag, physicalPlanGenerator);
 
     assertEquals(2, executionPlan.getStageDAG().getVertices().size());
-    assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(0).getTaskGroupIds().size());
-    assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(1).getTaskGroupIds().size());
+    assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(0).getTaskIds().size());
+    assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(1).getTaskIds().size());
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index 3861ee2..e85eff1 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -44,10 +44,10 @@ public class DataSkewRuntimePassTest {
    */
   @Test
   public void testDataSkewDynamicOptimizationPass() {
-    final Integer taskGroupListSize = 5;
+    final Integer taskListSize = 5;
 
     final List<KeyRange> keyRanges =
-        new DataSkewRuntimePass().calculateHashRanges(testMetricData, taskGroupListSize);
+        new DataSkewRuntimePass().calculateHashRanges(testMetricData, taskListSize);
 
     assertEquals(0, keyRanges.get(0).rangeBeginInclusive());
     assertEquals(3, keyRanges.get(0).rangeEndExclusive());
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
index d2304d7..611d821 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/common/plan/DAGConverterTest.java
@@ -103,8 +103,8 @@ public final class DAGConverterTest {
     assertEquals(physicalDAG.getOutgoingEdgesOf(physicalStage1).size(), 1);
     assertEquals(physicalDAG.getOutgoingEdgesOf(physicalStage2).size(), 0);
 
-    assertEquals(physicalStage1.getTaskGroupIds().size(), 3);
-    assertEquals(physicalStage2.getTaskGroupIds().size(), 2);
+    assertEquals(physicalStage1.getTaskIds().size(), 3);
+    assertEquals(physicalStage2.getTaskIds().size(), 2);
   }
 
   @Test
@@ -248,9 +248,9 @@ public final class DAGConverterTest {
 //    assertEquals(physicalDAG.getOutgoingEdgesOf(physicalStage1).size(), 1);
 //    assertEquals(physicalDAG.getOutgoingEdgesOf(physicalStage2).size(), 0);
 //
-//    final List<TaskGroup> taskGroupList1 = physicalStage1.getTaskGroupList();
-//    final List<TaskGroup> taskGroupList2 = physicalStage2.getTaskGroupList();
-//    assertEquals(taskGroupList1.size(), 3);
-//    assertEquals(taskGroupList2.size(), 2);
+//    final List<Task> taskList1 = physicalStage1.getTaskList();
+//    final List<Task> taskList2 = physicalStage2.getTaskList();
+//    assertEquals(taskList1.size(), 3);
+//    assertEquals(taskList2.size(), 2);
   }
 }
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
similarity index 86%
rename from tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
rename to tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
index 18a385d..affcbed 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskGroupExecutorTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/TaskExecutorTest.java
@@ -28,8 +28,8 @@ import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.executor.MetricMessageSender;
-import edu.snu.nemo.runtime.executor.TaskGroupExecutor;
-import edu.snu.nemo.runtime.executor.TaskGroupStateManager;
+import edu.snu.nemo.runtime.executor.TaskExecutor;
+import edu.snu.nemo.runtime.executor.TaskStateManager;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.datatransfer.DataTransferFactory;
 import edu.snu.nemo.runtime.executor.datatransfer.InputReader;
@@ -53,27 +53,27 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
 
 /**
- * Tests {@link TaskGroupExecutor}.
+ * Tests {@link TaskExecutor}.
  */
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({InputReader.class, OutputWriter.class, DataTransferFactory.class,
-    TaskGroupStateManager.class, PhysicalStageEdge.class})
-public final class TaskGroupExecutorTest {
+    TaskStateManager.class, PhysicalStageEdge.class})
+public final class TaskExecutorTest {
   private static final int DATA_SIZE = 100;
   private static final String CONTAINER_TYPE = "CONTAINER_TYPE";
   private static final int SOURCE_PARALLELISM = 5;
   private List elements;
   private Map<String, List<Object>> taskIdToOutputData;
   private DataTransferFactory dataTransferFactory;
-  private TaskGroupStateManager taskGroupStateManager;
+  private TaskStateManager taskStateManager;
   private MetricMessageSender metricMessageSender;
 
   @Before
   public void setUp() throws Exception {
     elements = getRangedNumList(0, DATA_SIZE);
 
-    // Mock a TaskGroupStateManager. It accumulates the state change into a list.
-    taskGroupStateManager = mock(TaskGroupStateManager.class);
+    // Mock a TaskStateManager. It accumulates the state change into a list.
+    taskStateManager = mock(TaskStateManager.class);
 
     // Mock a DataTransferFactory.
     taskIdToOutputData = new HashMap<>();
@@ -88,11 +88,10 @@ public final class TaskGroupExecutorTest {
   }
 
   /**
-   * Test the {@link BoundedSourceTask} processing in {@link TaskGroupExecutor}.
+   * Test the {@link BoundedSourceTask} processing in {@link TaskExecutor}.
    */
   @Test(timeout=2000)
   public void testSourceTask() throws Exception {
-    // Create a task group only having a source task.
     final IRVertex sourceIRVertex = new SimpleIRVertex();
     final String sourceIrVertexId = sourceIRVertex.getId();
 
@@ -117,15 +116,15 @@ public final class TaskGroupExecutorTest {
         new DAGBuilder<Task, RuntimeEdge<Task>>().addVertex(boundedSourceTask).build();
     final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(sourceIRVertex);
-    final String taskGroupId = RuntimeIdGenerator.generateTaskGroupId(0, stageId);
-    final ScheduledTaskGroup scheduledTaskGroup =
-        new ScheduledTaskGroup("testSourceTask", new byte[0], taskGroupId, Collections.emptyList(),
+    final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
+    final ScheduledTask scheduledTask =
+        new ScheduledTask("testSourceTask", new byte[0], taskId, Collections.emptyList(),
             Collections.singletonList(stageOutEdge), 0, CONTAINER_TYPE, logicalIdToReadable);
 
-    // Execute the task group.
-    final TaskGroupExecutor taskGroupExecutor = new TaskGroupExecutor(
-        scheduledTaskGroup, taskDag, taskGroupStateManager, dataTransferFactory, metricMessageSender);
-    taskGroupExecutor.execute();
+    // Execute the task.
+    final TaskExecutor taskExecutor = new TaskExecutor(
+        scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+    taskExecutor.execute();
 
     // Check the output.
     assertEquals(100, taskIdToOutputData.get(sourceTaskId).size());
@@ -133,9 +132,9 @@ public final class TaskGroupExecutorTest {
   }
 
   /**
-   * Test the {@link OperatorTask} processing in {@link TaskGroupExecutor}.
+   * Test the {@link OperatorTask} processing in {@link TaskExecutor}.
    *
-   * The DAG of the task group to test will looks like:
+   * The DAG of the task to test will looks like:
    * operator task 1 -> operator task 2
    *
    * The output data from upstream stage will be split
@@ -145,7 +144,6 @@ public final class TaskGroupExecutorTest {
    */
   @Test//(timeout=2000)
   public void testOperatorTask() throws Exception {
-    // Create a task group with two operator tasks.
     final IRVertex operatorIRVertex1 = new SimpleIRVertex();
     final IRVertex operatorIRVertex2 = new SimpleIRVertex();
     final String operatorIRVertexId1 = operatorIRVertex1.getId();
@@ -169,19 +167,19 @@ public final class TaskGroupExecutorTest {
         .connectVertices(new RuntimeEdge<Task>(
             runtimeIREdgeId, edgeProperties, operatorTask1, operatorTask2, coder))
         .build();
-    final String taskGroupId = RuntimeIdGenerator.generateTaskGroupId(0, stageId);
+    final String taskId = RuntimeIdGenerator.generateTaskId(0, stageId);
     final PhysicalStageEdge stageInEdge = mock(PhysicalStageEdge.class);
     when(stageInEdge.getDstVertex()).thenReturn(operatorIRVertex1);
     final PhysicalStageEdge stageOutEdge = mock(PhysicalStageEdge.class);
     when(stageOutEdge.getSrcVertex()).thenReturn(operatorIRVertex2);
-    final ScheduledTaskGroup scheduledTaskGroup =
-        new ScheduledTaskGroup("testSourceTask", new byte[0], taskGroupId, Collections.singletonList(stageInEdge),
+    final ScheduledTask scheduledTask =
+        new ScheduledTask("testSourceTask", new byte[0], taskId, Collections.singletonList(stageInEdge),
             Collections.singletonList(stageOutEdge), 0, CONTAINER_TYPE, Collections.emptyMap());
 
-    // Execute the task group.
-    final TaskGroupExecutor taskGroupExecutor = new TaskGroupExecutor(
-        scheduledTaskGroup, taskDag, taskGroupStateManager, dataTransferFactory, metricMessageSender);
-    taskGroupExecutor.execute();
+    // Execute the task.
+    final TaskExecutor taskExecutor = new TaskExecutor(
+        scheduledTask, taskDag, taskStateManager, dataTransferFactory, metricMessageSender);
+    taskExecutor.execute();
 
     // Check the output.
     assertEquals(100, taskIdToOutputData.get(operatorTaskId2).size());
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
index a89570d..025a393 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/executor/datatransfer/DataTransferTest.java
@@ -136,10 +136,10 @@ public final class DataTransferTest {
     final PubSubEventHandlerWrapper pubSubEventHandler = mock(PubSubEventHandlerWrapper.class);
     final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class);
     final SchedulingPolicy schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class);
-    final PendingTaskGroupCollection taskGroupQueue = new SingleJobTaskGroupCollection();
-    final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskGroupQueue, executorRegistry);
+    final PendingTaskCollection taskQueue = new SingleJobTaskCollection();
+    final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskQueue, executorRegistry);
     final Scheduler scheduler =
-        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, taskGroupQueue, master,
+        new BatchSingleJobScheduler(schedulingPolicy, schedulerRunner, taskQueue, master,
             pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry);
     final AtomicInteger executorCount = new AtomicInteger(0);
 
@@ -328,11 +328,11 @@ public final class DataTransferTest {
     dummyEdge = new PhysicalStageEdge(edgeId, edgeProperties, srcMockVertex, dstMockVertex,
         srcStage, dstStage, CODER, false);
     // Initialize states in Master
-    srcStage.getTaskGroupIds().forEach(srcTaskGroupId -> {
+    srcStage.getTaskIds().forEach(srcTaskId -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(
-          edgeId, RuntimeIdGenerator.getIndexFromTaskGroupId(srcTaskGroupId));
-      master.initializeState(blockId, srcTaskGroupId);
-      master.onProducerTaskGroupScheduled(srcTaskGroupId);
+          edgeId, RuntimeIdGenerator.getIndexFromTaskId(srcTaskId));
+      master.initializeState(blockId, srcTaskId);
+      master.onProducerTaskScheduled(srcTaskId);
     });
 
     // Write
@@ -419,14 +419,14 @@ public final class DataTransferTest {
     dummyEdge2 = new PhysicalStageEdge(edgeId2, edgeProperties, srcMockVertex, dstMockVertex2,
         srcStage, dstStage2, CODER, false);
     // Initialize states in Master
-    srcStage.getTaskGroupIds().forEach(srcTaskGroupId -> {
+    srcStage.getTaskIds().forEach(srcTaskId -> {
       final String blockId = RuntimeIdGenerator.generateBlockId(
-          edgeId, RuntimeIdGenerator.getIndexFromTaskGroupId(srcTaskGroupId));
-      master.initializeState(blockId, srcTaskGroupId);
+          edgeId, RuntimeIdGenerator.getIndexFromTaskId(srcTaskId));
+      master.initializeState(blockId, srcTaskId);
       final String blockId2 = RuntimeIdGenerator.generateBlockId(
-          edgeId2, RuntimeIdGenerator.getIndexFromTaskGroupId(srcTaskGroupId));
-      master.initializeState(blockId2, srcTaskGroupId);
-      master.onProducerTaskGroupScheduled(srcTaskGroupId);
+          edgeId2, RuntimeIdGenerator.getIndexFromTaskId(srcTaskId));
+      master.initializeState(blockId2, srcTaskId);
+      master.onProducerTaskScheduled(srcTaskId);
     });
 
     // Write
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java
index cac9e61..c08815c 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/BlockManagerMasterTest.java
@@ -84,17 +84,17 @@ public final class BlockManagerMasterTest {
   public void testLostAfterCommit() throws Exception {
     final String edgeId = RuntimeIdGenerator.generateRuntimeEdgeId("Edge-0");
     final int srcTaskIndex = 0;
-    final String taskGroupId = RuntimeIdGenerator.generateTaskGroupId(srcTaskIndex, "Stage-test");
+    final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, "Stage-test");
     final String executorId = RuntimeIdGenerator.generateExecutorId();
     final String blockId = RuntimeIdGenerator.generateBlockId(edgeId, srcTaskIndex);
 
     // Initially the block state is READY.
-    blockManagerMaster.initializeState(blockId, taskGroupId);
+    blockManagerMaster.initializeState(blockId, taskId);
     checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
         BlockState.State.READY);
 
     // The block is being SCHEDULED.
-    blockManagerMaster.onProducerTaskGroupScheduled(taskGroupId);
+    blockManagerMaster.onProducerTaskScheduled(taskId);
     final Future<String> future = blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture();
     checkPendingFuture(future);
 
@@ -110,33 +110,33 @@ public final class BlockManagerMasterTest {
   }
 
   /**
-   * Test scenario where producer task group fails.
+   * Test scenario where producer task fails.
    * @throws Exception
    */
   @Test
   public void testBeforeAfterCommit() throws Exception {
     final String edgeId = RuntimeIdGenerator.generateRuntimeEdgeId("Edge-1");
     final int srcTaskIndex = 0;
-    final String taskGroupId = RuntimeIdGenerator.generateTaskGroupId(srcTaskIndex, "Stage-Test");
+    final String taskId = RuntimeIdGenerator.generateTaskId(srcTaskIndex, "Stage-Test");
     final String executorId = RuntimeIdGenerator.generateExecutorId();
     final String blockId = RuntimeIdGenerator.generateBlockId(edgeId, srcTaskIndex);
 
     // The block is being scheduled.
-    blockManagerMaster.initializeState(blockId, taskGroupId);
-    blockManagerMaster.onProducerTaskGroupScheduled(taskGroupId);
+    blockManagerMaster.initializeState(blockId, taskId);
+    blockManagerMaster.onProducerTaskScheduled(taskId);
     final Future<String> future0 = blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture();
     checkPendingFuture(future0);
 
-    // Producer task group fails.
-    blockManagerMaster.onProducerTaskGroupFailed(taskGroupId);
+    // Producer task fails.
+    blockManagerMaster.onProducerTaskFailed(taskId);
 
     // A future, previously pending on SCHEDULED state, is now completed exceptionally.
     checkBlockAbsentException(future0, blockId, BlockState.State.LOST_BEFORE_COMMIT);
     checkBlockAbsentException(blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture(), blockId,
         BlockState.State.LOST_BEFORE_COMMIT);
 
-    // Re-scheduling the taskGroup.
-    blockManagerMaster.onProducerTaskGroupScheduled(taskGroupId);
+    // Re-scheduling the task.
+    blockManagerMaster.onProducerTaskScheduled(taskId);
     final Future<String> future1 = blockManagerMaster.getBlockLocationHandler(blockId).getLocationFuture();
     checkPendingFuture(future1);
 
diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
index d660033..bdc62e5 100644
--- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
+++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/master/JobStateManagerTest.java
@@ -32,7 +32,7 @@ import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
 import edu.snu.nemo.runtime.common.plan.physical.*;
 import edu.snu.nemo.runtime.common.state.JobState;
 import edu.snu.nemo.runtime.common.state.StageState;
-import edu.snu.nemo.runtime.common.state.TaskGroupState;
+import edu.snu.nemo.runtime.common.state.TaskState;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.runtime.master.JobStateManager;
@@ -135,18 +135,18 @@ public final class JobStateManagerTest {
     for (int stageIdx = 0; stageIdx < stageList.size(); stageIdx++) {
       final PhysicalStage physicalStage = stageList.get(stageIdx);
       jobStateManager.onStageStateChanged(physicalStage.getId(), StageState.State.EXECUTING);
-      final List<String> taskGroupIds = physicalStage.getTaskGroupIds();
-      taskGroupIds.forEach(taskGroupId -> {
-        jobStateManager.onTaskGroupStateChanged(taskGroupId, TaskGroupState.State.EXECUTING);
-        jobStateManager.onTaskGroupStateChanged(taskGroupId, TaskGroupState.State.COMPLETE);
-        if (RuntimeIdGenerator.getIndexFromTaskGroupId(taskGroupId) == taskGroupIds.size() - 1) {
+      final List<String> taskIds = physicalStage.getTaskIds();
+      taskIds.forEach(taskId -> {
+        jobStateManager.onTaskStateChanged(taskId, TaskState.State.EXECUTING);
+        jobStateManager.onTaskStateChanged(taskId, TaskState.State.COMPLETE);
+        if (RuntimeIdGenerator.getIndexFromTaskId(taskId) == taskIds.size() - 1) {
           assertTrue(jobStateManager.checkStageCompletion(physicalStage.getId()));
         }
       });
-      final Map<String, TaskGroupState> taskGroupStateMap = jobStateManager.getIdToTaskGroupStates();
-      taskGroupIds.forEach(taskGroupId -> {
-        assertEquals(taskGroupStateMap.get(taskGroupId).getStateMachine().getCurrentState(),
-            TaskGroupState.State.COMPLETE);
+      final Map<String, TaskState> taskStateMap = jobStateManager.getIdToTaskStates();
+      taskIds.forEach(taskId -> {
+        assertEquals(taskStateMap.get(taskId).getStateMachine().getCurrentState(),
+            TaskState.State.COMPLETE);
       });
 
       if (stageIdx == stageList.size() - 1) {

-- 
To stop receiving notification emails like this one, please contact
jeongyoon@apache.org.

Mime
View raw message