tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [09/13] tajo git commit: TAJO-324: Rename the prefix 'QueryUnit' to Task.
Date Fri, 19 Dec 2014 12:48:57 GMT
TAJO-324: Rename the prefix 'QueryUnit' to Task.

Closes #306


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5c852b79
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5c852b79
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5c852b79

Branch: refs/heads/index_support
Commit: 5c852b7984ba96cd60d528dd2086e2745cc8e3f5
Parents: ff57c77
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Thu Dec 18 12:55:25 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Thu Dec 18 12:55:25 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   5 +-
 .../java/org/apache/tajo/QueryIdFactory.java    |  12 +-
 .../org/apache/tajo/QueryUnitAttemptId.java     |  94 --
 .../main/java/org/apache/tajo/QueryUnitId.java  |  90 --
 .../java/org/apache/tajo/TaskAttemptId.java     |  94 ++
 .../src/main/java/org/apache/tajo/TaskId.java   |  90 ++
 .../java/org/apache/tajo/util/TajoIdUtils.java  |   8 +-
 tajo-common/src/main/proto/TajoIdProtos.proto   |   6 +-
 .../physical/HashShuffleFileWriteExec.java      |   3 +-
 .../engine/planner/physical/PhysicalExec.java   |   2 +-
 .../engine/planner/physical/SeqScanExec.java    |   2 +-
 .../tajo/engine/query/QueryUnitRequest.java     |  51 --
 .../tajo/engine/query/QueryUnitRequestImpl.java | 328 -------
 .../apache/tajo/engine/query/TaskRequest.java   |  51 ++
 .../tajo/engine/query/TaskRequestImpl.java      | 328 +++++++
 .../tajo/master/DefaultTaskScheduler.java       | 162 ++--
 .../apache/tajo/master/LazyTaskScheduler.java   |  46 +-
 .../master/NonForwardQueryResultScanner.java    |   6 +-
 .../apache/tajo/master/TajoContainerProxy.java  |   4 +-
 .../event/ContainerAllocatorEventType.java      |   2 +-
 .../tajo/master/event/LocalTaskEvent.java       |   8 +-
 .../event/QueryUnitAttemptScheduleEvent.java    |  87 --
 .../tajo/master/event/SubQueryEventType.java    |   2 +-
 .../tajo/master/event/SubQueryTaskEvent.java    |   8 +-
 .../master/event/TaskAttemptAssignedEvent.java  |   4 +-
 .../tajo/master/event/TaskAttemptEvent.java     |   8 +-
 .../master/event/TaskAttemptScheduleEvent.java  |   4 +-
 .../event/TaskAttemptStatusUpdateEvent.java     |   4 +-
 .../event/TaskAttemptToSchedulerEvent.java      |  87 ++
 .../tajo/master/event/TaskCompletionEvent.java  |   4 +-
 .../org/apache/tajo/master/event/TaskEvent.java |   8 +-
 .../tajo/master/event/TaskFatalErrorEvent.java  |   6 +-
 .../tajo/master/event/TaskRequestEvent.java     |   9 +-
 .../tajo/master/event/TaskTAttemptEvent.java    |  10 +-
 .../querymaster/QueryMasterManagerService.java  |  22 +-
 .../master/querymaster/QueryMasterTask.java     |  14 +-
 .../tajo/master/querymaster/QueryUnit.java      | 907 -------------------
 .../master/querymaster/QueryUnitAttempt.java    | 443 ---------
 .../tajo/master/querymaster/Repartitioner.java  |  14 +-
 .../tajo/master/querymaster/SubQuery.java       |  72 +-
 .../apache/tajo/master/querymaster/Task.java    | 907 +++++++++++++++++++
 .../tajo/master/querymaster/TaskAttempt.java    | 443 +++++++++
 .../main/java/org/apache/tajo/util/JSPUtil.java |  84 +-
 .../apache/tajo/util/history/HistoryReader.java |  22 +-
 .../apache/tajo/util/history/HistoryWriter.java |   4 +-
 .../tajo/util/history/QueryUnitHistory.java     | 167 ----
 .../tajo/util/history/SubQueryHistory.java      |  22 +-
 .../apache/tajo/util/history/TaskHistory.java   | 167 ++++
 .../tajo/worker/ExecutionBlockContext.java      |  12 +-
 .../java/org/apache/tajo/worker/FetchImpl.java  |  19 +-
 .../apache/tajo/worker/InterDataRetriever.java  |   8 +-
 .../tajo/worker/TajoWorkerManagerService.java   |   8 +-
 .../main/java/org/apache/tajo/worker/Task.java  |  31 +-
 .../apache/tajo/worker/TaskAttemptContext.java  |  12 +-
 .../org/apache/tajo/worker/TaskHistory.java     |  18 +-
 .../java/org/apache/tajo/worker/TaskRunner.java |  18 +-
 .../apache/tajo/worker/TaskRunnerHistory.java   |  16 +-
 .../apache/tajo/worker/TaskRunnerManager.java   |  10 +-
 .../retriever/AdvancedDataRetriever.java        |  14 +-
 .../src/main/proto/QueryMasterProtocol.proto    |   2 +-
 .../src/main/proto/TajoWorkerProtocol.proto     |  22 +-
 .../main/resources/webapps/admin/querytasks.jsp |  70 +-
 .../main/resources/webapps/admin/queryunit.jsp  | 134 ---
 .../src/main/resources/webapps/admin/task.jsp   | 134 +++
 .../resources/webapps/worker/querytasks.jsp     |  70 +-
 .../main/resources/webapps/worker/queryunit.jsp | 175 ----
 .../src/main/resources/webapps/worker/task.jsp  | 174 ++++
 .../resources/webapps/worker/taskdetail.jsp     |  14 +-
 .../resources/webapps/worker/taskhistory.jsp    |   6 +-
 .../src/main/resources/webapps/worker/tasks.jsp |  14 +-
 .../apache/tajo/LocalTajoTestingUtility.java    |  10 +-
 .../org/apache/tajo/TestQueryIdFactory.java     |   6 +-
 .../test/java/org/apache/tajo/TestTajoIds.java  |  32 +-
 .../org/apache/tajo/client/TestTajoClient.java  |  12 +-
 .../planner/physical/TestBNLJoinExec.java       |   4 +-
 .../planner/physical/TestBSTIndexExec.java      |   2 +-
 .../planner/physical/TestExternalSortExec.java  |   2 +-
 .../physical/TestFullOuterHashJoinExec.java     |   8 +-
 .../physical/TestFullOuterMergeJoinExec.java    |  12 +-
 .../planner/physical/TestHashAntiJoinExec.java  |   2 +-
 .../planner/physical/TestHashJoinExec.java      |   4 +-
 .../planner/physical/TestHashSemiJoinExec.java  |   2 +-
 .../physical/TestLeftOuterHashJoinExec.java     |  10 +-
 .../physical/TestLeftOuterNLJoinExec.java       |  10 +-
 .../planner/physical/TestMergeJoinExec.java     |   2 +-
 .../engine/planner/physical/TestNLJoinExec.java |   4 +-
 .../planner/physical/TestPhysicalPlanner.java   |  60 +-
 .../physical/TestProgressExternalSortExec.java  |   2 +-
 .../physical/TestRightOuterHashJoinExec.java    |   6 +-
 .../physical/TestRightOuterMergeJoinExec.java   |  12 +-
 .../engine/planner/physical/TestSortExec.java   |   2 +-
 .../tajo/engine/query/TestGroupByQuery.java     |   4 +-
 .../apache/tajo/master/TestRepartitioner.java   |  21 +-
 .../querymaster/TestIntermediateEntry.java      |   2 +-
 .../querymaster/TestQueryUnitStatusUpdate.java  | 194 ----
 .../querymaster/TestTaskStatusUpdate.java       | 194 ++++
 .../java/org/apache/tajo/util/TestJSPUtil.java  |  54 +-
 .../util/history/TestHistoryWriterReader.java   |  39 +-
 .../org/apache/tajo/worker/TestHistory.java     |   8 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   4 +-
 .../worker/dataserver/TestHttpDataServer.java   |  12 +-
 .../queries/TestQueryUnitStatusUpdate/case1.sql |   1 -
 .../queries/TestQueryUnitStatusUpdate/case2.sql |   5 -
 .../queries/TestQueryUnitStatusUpdate/case3.sql |  10 -
 .../queries/TestTaskStatusUpdate/case1.sql      |   1 +
 .../queries/TestTaskStatusUpdate/case2.sql      |   5 +
 .../queries/TestTaskStatusUpdate/case3.sql      |  10 +
 tajo-dist/pom.xml                               |   4 +-
 .../tajo/pullserver/HttpDataServerHandler.java  |   2 +-
 .../retriever/AdvancedDataRetriever.java        |  16 +-
 .../org/apache/tajo/storage/StorageManager.java |   6 +-
 .../org/apache/tajo/storage/StorageUtil.java    |   4 +-
 .../storage/hbase/AbstractHBaseAppender.java    |   6 +-
 .../tajo/storage/hbase/HBasePutAppender.java    |   4 +-
 .../tajo/storage/hbase/HBaseStorageManager.java |   2 +-
 .../tajo/storage/hbase/HFileAppender.java       |   8 +-
 .../java/org/apache/tajo/storage/CSVFile.java   |   4 +-
 .../org/apache/tajo/storage/FileAppender.java   |   6 +-
 .../apache/tajo/storage/FileStorageManager.java |   8 +-
 .../tajo/storage/HashShuffleAppender.java       |  12 +-
 .../storage/HashShuffleAppenderManager.java     |   6 +-
 .../java/org/apache/tajo/storage/RawFile.java   |   4 +-
 .../java/org/apache/tajo/storage/RowFile.java   |   4 +-
 .../apache/tajo/storage/avro/AvroAppender.java  |   4 +-
 .../tajo/storage/parquet/ParquetAppender.java   |   4 +-
 .../org/apache/tajo/storage/rcfile/RCFile.java  |   4 +-
 .../sequencefile/SequenceFileAppender.java      |   4 +-
 .../tajo/storage/text/DelimitedTextFile.java    |   5 +-
 128 files changed, 3390 insertions(+), 3397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 6b9146a..2230d0f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -24,7 +24,8 @@ Release 0.9.1 - unreleased
 
   IMPROVEMENT
 
-    TAJO-1221: HA TajoClient should not connect TajoMaster at the first. (jaehwa)
+    TAJO-1221: HA TajoClient should not connect TajoMaster at the first. 
+    (jaehwa)
 
     TAJO-1241: Change default client and table time zone behavior. (hyunsik)
 
@@ -224,6 +225,8 @@ Release 0.9.1 - unreleased
 
   SUB TASKS
 
+    TAJO-324: Rename the prefix 'QueryUnit' to Task. (hyunsik)
+
     TAJO-1151: Implement the ByteBuffer-based De/Serializer. (jinho)
 
     TAJO-1152: RawFile ByteBuffer should be reuse. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java b/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java
index 8ac0d54..9599007 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryIdFactory.java
@@ -92,15 +92,15 @@ public class QueryIdFactory {
     return new ExecutionBlockId(queryId, id);
   }
 
-  public synchronized static QueryUnitId newQueryUnitId(ExecutionBlockId executionBlockId) {
-    return new QueryUnitId(executionBlockId, nextId.incrementAndGet());
+  public synchronized static TaskId newTaskId(ExecutionBlockId executionBlockId) {
+    return new TaskId(executionBlockId, nextId.incrementAndGet());
   }
 
-  public synchronized static QueryUnitId newQueryUnitId(ExecutionBlockId executionBlockId, int id) {
-    return new QueryUnitId(executionBlockId, id);
+  public synchronized static TaskId newTaskId(ExecutionBlockId executionBlockId, int id) {
+    return new TaskId(executionBlockId, id);
   }
 
-  public synchronized static QueryUnitAttemptId newQueryUnitAttemptId(QueryUnitId queryUnitId, final int attemptId) {
-    return new QueryUnitAttemptId(queryUnitId, attemptId);
+  public synchronized static TaskAttemptId newTaskAttemptId(TaskId taskId, final int attemptId) {
+    return new TaskAttemptId(taskId, attemptId);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
deleted file mode 100644
index a9fd68b..0000000
--- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitAttemptId.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import com.google.common.base.Objects;
-
-public class QueryUnitAttemptId implements Comparable<QueryUnitAttemptId> {
-  public static final String QUA_ID_PREFIX = "ta";
-
-  private QueryUnitId queryUnitId;
-  private int id;
-
-  public QueryUnitId getQueryUnitId() {
-    return queryUnitId;
-  }
-
-  public int getId() {
-    return id;
-  }
-
-  public void setId(int id) {
-    this.id = id;
-  }
-
-  public QueryUnitAttemptId(QueryUnitId queryUnitId, int id) {
-    this.queryUnitId = queryUnitId;
-    this.id = id;
-  }
-
-  public QueryUnitAttemptId(TajoIdProtos.QueryUnitAttemptIdProto proto) {
-    this(new QueryUnitId(proto.getQueryUnitId()), proto.getId());
-  }
-
-  public TajoIdProtos.QueryUnitAttemptIdProto getProto() {
-    return TajoIdProtos.QueryUnitAttemptIdProto.newBuilder()
-        .setQueryUnitId(queryUnitId.getProto())
-        .setId(id)
-        .build();
-  }
-
-  @Override
-  public int compareTo(QueryUnitAttemptId queryUnitAttemptId) {
-    int result = queryUnitId.compareTo(queryUnitAttemptId.queryUnitId);
-    if (result == 0) {
-      return id - queryUnitAttemptId.id;
-    } else {
-      return result;
-    }
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null) {
-      return false;
-    }
-    if (this == obj) {
-      return true;
-    }
-    if(!(obj instanceof QueryUnitAttemptId)) {
-      return false;
-    }
-    return compareTo((QueryUnitAttemptId)obj) == 0;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(queryUnitId, id);
-  }
-
-  @Override
-  public String toString() {
-    return QUA_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix();
-  }
-
-  public String toStringNoPrefix() {
-    return queryUnitId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.ATTEMPT_ID_FORMAT.format(id);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java b/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
deleted file mode 100644
index da0479b..0000000
--- a/tajo-common/src/main/java/org/apache/tajo/QueryUnitId.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import com.google.common.base.Objects;
-
-public class QueryUnitId implements Comparable<QueryUnitId> {
-  public static final String QU_ID_PREFIX = "t";
-
-  private ExecutionBlockId executionBlockId;
-  private int id;
-
-  public QueryUnitId(ExecutionBlockId executionBlockId, int id) {
-    this.executionBlockId = executionBlockId;
-    this.id = id;
-  }
-
-  public QueryUnitId(TajoIdProtos.QueryUnitIdProto proto) {
-    this(new ExecutionBlockId(proto.getExecutionBlockId()), proto.getId());
-  }
-
-  public ExecutionBlockId getExecutionBlockId() {
-    return executionBlockId;
-  }
-
-  public int getId() {
-    return id;
-  }
-
-  public TajoIdProtos.QueryUnitIdProto getProto() {
-    return TajoIdProtos.QueryUnitIdProto.newBuilder()
-        .setExecutionBlockId(executionBlockId.getProto())
-        .setId(id)
-        .build();
-  }
-
-  @Override
-  public int compareTo(QueryUnitId queryUnitId) {
-    int result = executionBlockId.compareTo(queryUnitId.executionBlockId);
-    if (result == 0) {
-      return id - queryUnitId.id;
-    } else {
-      return result;
-    }
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj == null) {
-      return false;
-    }
-    if (this == obj) {
-      return true;
-    }
-    if (!(obj instanceof QueryUnitId)) {
-      return false;
-    }
-    return compareTo((QueryUnitId) obj) == 0;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(executionBlockId, id);
-  }
-
-  @Override
-  public String toString() {
-    return QU_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix();
-  }
-
-  public String toStringNoPrefix() {
-    return executionBlockId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.QU_ID_FORMAT.format(id);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/TaskAttemptId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/TaskAttemptId.java b/tajo-common/src/main/java/org/apache/tajo/TaskAttemptId.java
new file mode 100644
index 0000000..78c6325
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/TaskAttemptId.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.common.base.Objects;
+
+public class TaskAttemptId implements Comparable<TaskAttemptId> {
+  public static final String QUA_ID_PREFIX = "ta";
+
+  private TaskId taskId;
+  private int id;
+
+  public TaskId getTaskId() {
+    return taskId;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public void setId(int id) {
+    this.id = id;
+  }
+
+  public TaskAttemptId(TaskId taskId, int id) {
+    this.taskId = taskId;
+    this.id = id;
+  }
+
+  public TaskAttemptId(TajoIdProtos.TaskAttemptIdProto proto) {
+    this(new TaskId(proto.getTaskId()), proto.getId());
+  }
+
+  public TajoIdProtos.TaskAttemptIdProto getProto() {
+    return TajoIdProtos.TaskAttemptIdProto.newBuilder()
+        .setTaskId(taskId.getProto())
+        .setId(id)
+        .build();
+  }
+
+  @Override
+  public int compareTo(TaskAttemptId taskAttemptId) {
+    int result = taskId.compareTo(taskAttemptId.taskId);
+    if (result == 0) {
+      return id - taskAttemptId.id;
+    } else {
+      return result;
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if(!(obj instanceof TaskAttemptId)) {
+      return false;
+    }
+    return compareTo((TaskAttemptId)obj) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(taskId, id);
+  }
+
+  @Override
+  public String toString() {
+    return QUA_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix();
+  }
+
+  public String toStringNoPrefix() {
+    return taskId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.ATTEMPT_ID_FORMAT.format(id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/TaskId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/TaskId.java b/tajo-common/src/main/java/org/apache/tajo/TaskId.java
new file mode 100644
index 0000000..e1db67d
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/TaskId.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.common.base.Objects;
+
+public class TaskId implements Comparable<TaskId> {
+  public static final String QU_ID_PREFIX = "t";
+
+  private ExecutionBlockId executionBlockId;
+  private int id;
+
+  public TaskId(ExecutionBlockId executionBlockId, int id) {
+    this.executionBlockId = executionBlockId;
+    this.id = id;
+  }
+
+  public TaskId(TajoIdProtos.TaskIdProto proto) {
+    this(new ExecutionBlockId(proto.getExecutionBlockId()), proto.getId());
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public TajoIdProtos.TaskIdProto getProto() {
+    return TajoIdProtos.TaskIdProto.newBuilder()
+        .setExecutionBlockId(executionBlockId.getProto())
+        .setId(id)
+        .build();
+  }
+
+  @Override
+  public int compareTo(TaskId taskId) {
+    int result = executionBlockId.compareTo(taskId.executionBlockId);
+    if (result == 0) {
+      return id - taskId.id;
+    } else {
+      return result;
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof TaskId)) {
+      return false;
+    }
+    return compareTo((TaskId) obj) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(executionBlockId, id);
+  }
+
+  @Override
+  public String toString() {
+    return QU_ID_PREFIX + QueryId.SEPARATOR + toStringNoPrefix();
+  }
+
+  public String toStringNoPrefix() {
+    return executionBlockId.toStringNoPrefix() + QueryId.SEPARATOR + QueryIdFactory.QU_ID_FORMAT.format(id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
index 978af6f..cc0f854 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/TajoIdUtils.java
@@ -20,8 +20,8 @@ package org.apache.tajo.util;
 
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
 
 import java.text.DecimalFormat;
 
@@ -34,10 +34,10 @@ public class TajoIdUtils {
     return new ExecutionBlockId(new QueryId(tokens[1], Integer.parseInt(tokens[2])), Integer.parseInt(tokens[3]));
   }
 
-  public static QueryUnitAttemptId parseQueryUnitAttemptId(String idStr) {
+  public static TaskAttemptId parseTaskAttemptId(String idStr) {
     String[] tokens = idStr.split("_");
 
-    return new QueryUnitAttemptId(new QueryUnitId(
+    return new TaskAttemptId(new TaskId(
         new ExecutionBlockId(new QueryId(tokens[1], Integer.parseInt(tokens[2])), Integer.parseInt(tokens[3])),
         Integer.parseInt(tokens[4])), Integer.parseInt(tokens[5]));
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-common/src/main/proto/TajoIdProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/TajoIdProtos.proto b/tajo-common/src/main/proto/TajoIdProtos.proto
index 1fb8bbd..c5e80a6 100644
--- a/tajo-common/src/main/proto/TajoIdProtos.proto
+++ b/tajo-common/src/main/proto/TajoIdProtos.proto
@@ -31,13 +31,13 @@ message ExecutionBlockIdProto {
     required int32 id = 2;
 }
 
-message QueryUnitIdProto {
+message TaskIdProto {
     required ExecutionBlockIdProto executionBlockId = 1;
     required int32 id = 2;
 }
 
-message QueryUnitAttemptIdProto {
-  required QueryUnitIdProto queryUnitId = 1;
+message TaskAttemptIdProto {
+  required TaskIdProto taskId = 1;
   required int32 id = 2;
 }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index d051fb6..3c4949f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -27,7 +27,6 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.plan.logical.ShuffleFileWriteNode;
-import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.HashShuffleAppender;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.Tuple;
@@ -87,7 +86,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
     HashShuffleAppender appender = appenderMap.get(partId);
     if (appender == null) {
       appender = hashShuffleAppenderManager.getAppender(context.getConf(),
-          context.getQueryId().getQueryUnitId().getExecutionBlockId(), partId, meta, outSchema);
+          context.getQueryId().getTaskId().getExecutionBlockId(), partId, meta, outSchema);
       appenderMap.put(partId, appender);
     }
     return appender;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index 67fb29b..de14c9a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -79,7 +79,7 @@ public abstract class PhysicalExec implements SchemaObject {
   }
 
   protected Path getExecutorTmpDir() {
-    return new Path(context.getQueryId().getQueryUnitId().getExecutionBlockId().getQueryId().toString(),
+    return new Path(context.getQueryId().getTaskId().getExecutionBlockId().getQueryId().toString(),
         UUID.randomUUID().toString());
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index f507988..94cd4ed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -88,7 +88,7 @@ public class SeqScanExec extends PhysicalExec {
       }
 
       cacheKey = new TupleCacheKey(
-          context.getTaskId().getQueryUnitId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey);
+          context.getTaskId().getTaskId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey);
     }
 
     if (fragments != null

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
deleted file mode 100644
index 3b0d60d..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.engine.query;
-
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.common.ProtoObject;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.util.List;
-
-public interface QueryUnitRequest extends ProtoObject<TajoWorkerProtocol.QueryUnitRequestProto> {
-
-	public QueryUnitAttemptId getId();
-	public List<CatalogProtos.FragmentProto> getFragments();
-	public String getOutputTableId();
-	public boolean isClusteredOutput();
-	public String getSerializedData();
-	public boolean isInterQuery();
-	public void setInterQuery();
-	public void addFetch(String name, FetchImpl fetch);
-	public List<FetchImpl> getFetches();
-  public boolean shouldDie();
-  public void setShouldDie();
-  public QueryContext getQueryContext(TajoConf conf);
-  public DataChannel getDataChannel();
-  public Enforcer getEnforcer();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
deleted file mode 100644
index 1b89afd..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryUnitRequestImpl.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.engine.query;
-
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.enforce.Enforcer;
-import org.apache.tajo.engine.planner.global.DataChannel;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
-import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProtoOrBuilder;
-import org.apache.tajo.worker.FetchImpl;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class QueryUnitRequestImpl implements QueryUnitRequest {
-	
-  private QueryUnitAttemptId id;
-  private List<FragmentProto> fragments;
-  private String outputTable;
-	private boolean isUpdated;
-	private boolean clusteredOutput;
-	private String serializedData;     // logical node
-	private Boolean interQuery;
-	private List<FetchImpl> fetches;
-  private Boolean shouldDie;
-  private QueryContext queryContext;
-  private DataChannel dataChannel;
-  private Enforcer enforcer;
-	
-	private QueryUnitRequestProto proto = QueryUnitRequestProto.getDefaultInstance();
-	private QueryUnitRequestProto.Builder builder = null;
-	private boolean viaProto = false;
-	
-	public QueryUnitRequestImpl() {
-		builder = QueryUnitRequestProto.newBuilder();
-		this.id = null;
-		this.isUpdated = false;
-	}
-	
-	public QueryUnitRequestImpl(QueryUnitAttemptId id, List<FragmentProto> fragments,
-			String outputTable, boolean clusteredOutput,
-			String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) {
-		this();
-		this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer);
-	}
-	
-	public QueryUnitRequestImpl(QueryUnitRequestProto proto) {
-		this.proto = proto;
-		viaProto = true;
-		id = null;
-		isUpdated = false;
-	}
-	
-	public void set(QueryUnitAttemptId id, List<FragmentProto> fragments,
-			String outputTable, boolean clusteredOutput,
-			String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
-		this.id = id;
-		this.fragments = fragments;
-		this.outputTable = outputTable;
-		this.clusteredOutput = clusteredOutput;
-		this.serializedData = serializedData;
-		this.isUpdated = true;
-    this.queryContext = queryContext;
-    this.queryContext = queryContext;
-    this.dataChannel = dataChannel;
-    this.enforcer = enforcer;
-	}
-
-	@Override
-	public QueryUnitRequestProto getProto() {
-		mergeLocalToProto();
-		proto = viaProto ? proto : builder.build();
-		viaProto = true;
-		return proto;
-	}
-
-	@Override
-	public QueryUnitAttemptId getId() {
-		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-		if (id != null) {
-			return this.id;
-		}
-		if (!p.hasId()) {
-			return null;
-		}
-		this.id = new QueryUnitAttemptId(p.getId());
-		return this.id;
-	}
-
-	@Override
-	public List<FragmentProto> getFragments() {
-		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-		if (fragments != null) {
-			return fragments;
-		}
-		if (fragments == null) {
-			fragments = new ArrayList<FragmentProto>();
-		}
-		for (int i = 0; i < p.getFragmentsCount(); i++) {
-			fragments.add(p.getFragments(i));
-		}
-		return this.fragments;
-	}
-
-	@Override
-	public String getOutputTableId() {
-		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-		if (outputTable != null) {
-			return this.outputTable;
-		}
-		if (!p.hasOutputTable()) {
-			return null;
-		}
-		this.outputTable = p.getOutputTable();
-		return this.outputTable;
-	}
-
-	@Override
-	public boolean isClusteredOutput() {
-		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-		if (isUpdated) {
-			return this.clusteredOutput;
-		}
-		if (!p.hasClusteredOutput()) {
-			return false;
-		}
-		this.clusteredOutput = p.getClusteredOutput();
-		this.isUpdated = true;
-		return this.clusteredOutput;
-	}
-
-	@Override
-	public String getSerializedData() {
-		QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-		if (this.serializedData != null) {
-			return this.serializedData;
-		}
-		if (!p.hasSerializedData()) {
-			return null;
-		}
-		this.serializedData = p.getSerializedData();
-		return this.serializedData;
-	}
-
-	public boolean isInterQuery() {
-	  QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-    if (interQuery != null) {
-      return interQuery;
-    }
-    if (!p.hasInterQuery()) {
-      return false;
-    }
-    this.interQuery = p.getInterQuery();
-    return this.interQuery;
-	}
-	
-	public void setInterQuery() {
-	  maybeInitBuilder();
-	  this.interQuery = true;
-	}
-
-  public void addFetch(String name, FetchImpl fetch) {
-    maybeInitBuilder();
-    initFetches();
-    fetch.setName(name);
-    fetches.add(fetch);
-  }
-
-  public QueryContext getQueryContext(TajoConf conf) {
-    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-    if (queryContext != null) {
-      return queryContext;
-    }
-    if (!p.hasQueryContext()) {
-      return null;
-    }
-    this.queryContext = new QueryContext(conf, p.getQueryContext());
-    return this.queryContext;
-  }
-
-  public void setQueryContext(QueryContext queryContext) {
-    maybeInitBuilder();
-    this.queryContext = queryContext;
-  }
-
-  public void setDataChannel(DataChannel dataChannel) {
-    maybeInitBuilder();
-    this.dataChannel = dataChannel;
-  }
-
-  @Override
-  public DataChannel getDataChannel() {
-    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-    if (dataChannel != null) {
-      return dataChannel;
-    }
-    if (!p.hasDataChannel()) {
-      return null;
-    }
-    this.dataChannel = new DataChannel(p.getDataChannel());
-    return this.dataChannel;
-  }
-
-  @Override
-  public Enforcer getEnforcer() {
-    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-    if (enforcer != null) {
-      return enforcer;
-    }
-    if (!p.hasEnforcer()) {
-      return null;
-    }
-    this.enforcer = new Enforcer(p.getEnforcer());
-    return this.enforcer;
-  }
-
-  public List<FetchImpl> getFetches() {
-	  initFetches();    
-
-    return this.fetches;
-	}
-	
-	private void initFetches() {
-	  if (this.fetches != null) {
-      return;
-    }
-    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-    this.fetches = new ArrayList<FetchImpl>();
-    for(TajoWorkerProtocol.FetchProto fetch : p.getFetchesList()) {
-      fetches.add(new FetchImpl(fetch));
-    }
-	}
-
-  @Override
-  public boolean shouldDie() {
-    QueryUnitRequestProtoOrBuilder p = viaProto ? proto : builder;
-    if (shouldDie != null) {
-      return shouldDie;
-    }
-    if (!p.hasShouldDie()) {
-      return false;
-    }
-    this.shouldDie = p.getShouldDie();
-    return this.shouldDie;
-  }
-
-  @Override
-  public void setShouldDie() {
-    maybeInitBuilder();
-    shouldDie = true;
-  }
-
-  private void maybeInitBuilder() {
-		if (viaProto || builder == null) {
-			builder = QueryUnitRequestProto.newBuilder(proto);
-		}
-		viaProto = true;
-	}
-	
-	private void mergeLocalToBuilder() {
-		if (id != null) {
-			builder.setId(this.id.getProto());
-		}
-		if (fragments != null) {
-			for (int i = 0; i < fragments.size(); i++) {
-				builder.addFragments(fragments.get(i));
-			}
-		}
-		if (this.outputTable != null) {
-			builder.setOutputTable(this.outputTable);
-		}
-		if (this.isUpdated) {
-			builder.setClusteredOutput(this.clusteredOutput);
-		}
-		if (this.serializedData != null) {
-			builder.setSerializedData(this.serializedData);
-		}
-		if (this.interQuery != null) {
-		  builder.setInterQuery(this.interQuery);
-		}
-    if (this.fetches != null) {
-      for (int i = 0; i < fetches.size(); i++) {
-        builder.addFetches(fetches.get(i).getProto());
-      }
-    }
-    if (this.shouldDie != null) {
-      builder.setShouldDie(this.shouldDie);
-    }
-    if (this.queryContext != null) {
-      builder.setQueryContext(queryContext.getProto());
-    }
-    if (this.dataChannel != null) {
-      builder.setDataChannel(dataChannel.getProto());
-    }
-    if (this.enforcer != null) {
-      builder.setEnforcer(enforcer.getProto());
-    }
-	}
-
-	private void mergeLocalToProto() {
-		if(viaProto) {
-			maybeInitBuilder();
-		}
-		mergeLocalToBuilder();
-		proto = builder.build();
-		viaProto = true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
new file mode 100644
index 0000000..a3e586a
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.worker.FetchImpl;
+
+import java.util.List;
+
+public interface TaskRequest extends ProtoObject<TajoWorkerProtocol.TaskRequestProto> {
+
+	public TaskAttemptId getId();
+	public List<CatalogProtos.FragmentProto> getFragments();
+	public String getOutputTableId();
+	public boolean isClusteredOutput();
+	public String getSerializedData();
+	public boolean isInterQuery();
+	public void setInterQuery();
+	public void addFetch(String name, FetchImpl fetch);
+	public List<FetchImpl> getFetches();
+  public boolean shouldDie();
+  public void setShouldDie();
+  public QueryContext getQueryContext(TajoConf conf);
+  public DataChannel getDataChannel();
+  public Enforcer getEnforcer();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
new file mode 100644
index 0000000..cef5488
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.query;
+
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.global.DataChannel;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProto;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskRequestProtoOrBuilder;
+import org.apache.tajo.worker.FetchImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+
+public class TaskRequestImpl implements TaskRequest {
+	
+  private TaskAttemptId id;
+  private List<FragmentProto> fragments;
+  private String outputTable;
+	private boolean isUpdated;
+	private boolean clusteredOutput;
+	private String serializedData;     // logical node
+	private Boolean interQuery;
+	private List<FetchImpl> fetches;
+  private Boolean shouldDie;
+  private QueryContext queryContext;
+  private DataChannel dataChannel;
+  private Enforcer enforcer;
+	
+	private TaskRequestProto proto = TajoWorkerProtocol.TaskRequestProto.getDefaultInstance();
+	private TajoWorkerProtocol.TaskRequestProto.Builder builder = null;
+	private boolean viaProto = false;
+	
+	public TaskRequestImpl() {
+		builder = TaskRequestProto.newBuilder();
+		this.id = null;
+		this.isUpdated = false;
+	}
+	
+	public TaskRequestImpl(TaskAttemptId id, List<FragmentProto> fragments,
+												 String outputTable, boolean clusteredOutput,
+												 String serializedData, QueryContext queryContext, DataChannel channel, Enforcer enforcer) {
+		this();
+		this.set(id, fragments, outputTable, clusteredOutput, serializedData, queryContext, channel, enforcer);
+	}
+	
+	public TaskRequestImpl(TaskRequestProto proto) {
+		this.proto = proto;
+		viaProto = true;
+		id = null;
+		isUpdated = false;
+	}
+	
+	public void set(TaskAttemptId id, List<FragmentProto> fragments,
+			String outputTable, boolean clusteredOutput,
+			String serializedData, QueryContext queryContext, DataChannel dataChannel, Enforcer enforcer) {
+		this.id = id;
+		this.fragments = fragments;
+		this.outputTable = outputTable;
+		this.clusteredOutput = clusteredOutput;
+		this.serializedData = serializedData;
+		this.isUpdated = true;
+    this.queryContext = queryContext;
+    this.queryContext = queryContext;
+    this.dataChannel = dataChannel;
+    this.enforcer = enforcer;
+	}
+
+	@Override
+	public TaskRequestProto getProto() {
+		mergeLocalToProto();
+		proto = viaProto ? proto : builder.build();
+		viaProto = true;
+		return proto;
+	}
+
+	@Override
+	public TaskAttemptId getId() {
+		TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (id != null) {
+			return this.id;
+		}
+		if (!p.hasId()) {
+			return null;
+		}
+		this.id = new TaskAttemptId(p.getId());
+		return this.id;
+	}
+
+	@Override
+	public List<FragmentProto> getFragments() {
+		TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (fragments != null) {
+			return fragments;
+		}
+		if (fragments == null) {
+			fragments = new ArrayList<FragmentProto>();
+		}
+		for (int i = 0; i < p.getFragmentsCount(); i++) {
+			fragments.add(p.getFragments(i));
+		}
+		return this.fragments;
+	}
+
+	@Override
+	public String getOutputTableId() {
+		TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (outputTable != null) {
+			return this.outputTable;
+		}
+		if (!p.hasOutputTable()) {
+			return null;
+		}
+		this.outputTable = p.getOutputTable();
+		return this.outputTable;
+	}
+
+	@Override
+	public boolean isClusteredOutput() {
+		TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (isUpdated) {
+			return this.clusteredOutput;
+		}
+		if (!p.hasClusteredOutput()) {
+			return false;
+		}
+		this.clusteredOutput = p.getClusteredOutput();
+		this.isUpdated = true;
+		return this.clusteredOutput;
+	}
+
+	@Override
+	public String getSerializedData() {
+		TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+		if (this.serializedData != null) {
+			return this.serializedData;
+		}
+		if (!p.hasSerializedData()) {
+			return null;
+		}
+		this.serializedData = p.getSerializedData();
+		return this.serializedData;
+	}
+
+	public boolean isInterQuery() {
+	  TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (interQuery != null) {
+      return interQuery;
+    }
+    if (!p.hasInterQuery()) {
+      return false;
+    }
+    this.interQuery = p.getInterQuery();
+    return this.interQuery;
+	}
+	
+	public void setInterQuery() {
+	  maybeInitBuilder();
+	  this.interQuery = true;
+	}
+
+  public void addFetch(String name, FetchImpl fetch) {
+    maybeInitBuilder();
+    initFetches();
+    fetch.setName(name);
+    fetches.add(fetch);
+  }
+
+  public QueryContext getQueryContext(TajoConf conf) {
+    TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (queryContext != null) {
+      return queryContext;
+    }
+    if (!p.hasQueryContext()) {
+      return null;
+    }
+    this.queryContext = new QueryContext(conf, p.getQueryContext());
+    return this.queryContext;
+  }
+
+  public void setQueryContext(QueryContext queryContext) {
+    maybeInitBuilder();
+    this.queryContext = queryContext;
+  }
+
+  public void setDataChannel(DataChannel dataChannel) {
+    maybeInitBuilder();
+    this.dataChannel = dataChannel;
+  }
+
+  @Override
+  public DataChannel getDataChannel() {
+    TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (dataChannel != null) {
+      return dataChannel;
+    }
+    if (!p.hasDataChannel()) {
+      return null;
+    }
+    this.dataChannel = new DataChannel(p.getDataChannel());
+    return this.dataChannel;
+  }
+
+  @Override
+  public Enforcer getEnforcer() {
+    TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (enforcer != null) {
+      return enforcer;
+    }
+    if (!p.hasEnforcer()) {
+      return null;
+    }
+    this.enforcer = new Enforcer(p.getEnforcer());
+    return this.enforcer;
+  }
+
+  public List<FetchImpl> getFetches() {
+	  initFetches();    
+
+    return this.fetches;
+	}
+	
+	private void initFetches() {
+	  if (this.fetches != null) {
+      return;
+    }
+    TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+    this.fetches = new ArrayList<FetchImpl>();
+    for(TajoWorkerProtocol.FetchProto fetch : p.getFetchesList()) {
+      fetches.add(new FetchImpl(fetch));
+    }
+	}
+
+  @Override
+  public boolean shouldDie() {
+    TaskRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (shouldDie != null) {
+      return shouldDie;
+    }
+    if (!p.hasShouldDie()) {
+      return false;
+    }
+    this.shouldDie = p.getShouldDie();
+    return this.shouldDie;
+  }
+
+  @Override
+  public void setShouldDie() {
+    maybeInitBuilder();
+    shouldDie = true;
+  }
+
+  private void maybeInitBuilder() {
+		if (viaProto || builder == null) {
+			builder = TajoWorkerProtocol.TaskRequestProto.newBuilder(proto);
+		}
+		viaProto = true;
+	}
+	
+	private void mergeLocalToBuilder() {
+		if (id != null) {
+			builder.setId(this.id.getProto());
+		}
+		if (fragments != null) {
+			for (int i = 0; i < fragments.size(); i++) {
+				builder.addFragments(fragments.get(i));
+			}
+		}
+		if (this.outputTable != null) {
+			builder.setOutputTable(this.outputTable);
+		}
+		if (this.isUpdated) {
+			builder.setClusteredOutput(this.clusteredOutput);
+		}
+		if (this.serializedData != null) {
+			builder.setSerializedData(this.serializedData);
+		}
+		if (this.interQuery != null) {
+		  builder.setInterQuery(this.interQuery);
+		}
+    if (this.fetches != null) {
+      for (int i = 0; i < fetches.size(); i++) {
+        builder.addFetches(fetches.get(i).getProto());
+      }
+    }
+    if (this.shouldDie != null) {
+      builder.setShouldDie(this.shouldDie);
+    }
+    if (this.queryContext != null) {
+      builder.setQueryContext(queryContext.getProto());
+    }
+    if (this.dataChannel != null) {
+      builder.setDataChannel(dataChannel.getProto());
+    }
+    if (this.enforcer != null) {
+      builder.setEnforcer(enforcer.getProto());
+    }
+	}
+
+	private void mergeLocalToProto() {
+		if(viaProto) {
+			maybeInitBuilder();
+		}
+		mergeLocalToBuilder();
+		proto = builder.build();
+		viaProto = true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 01137aa..d9d496e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -28,18 +28,18 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.RackResolver;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.query.QueryUnitRequest;
-import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.engine.query.TaskRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryUnit;
-import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.master.querymaster.Task;
+import org.apache.tajo.master.querymaster.TaskAttempt;
 import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.DataLocation;
@@ -114,14 +114,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     super.start();
   }
 
-  private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
-  public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+  private static final TaskAttemptId NULL_ATTEMPT_ID;
+  public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
   static {
     ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-    NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0);
 
-    TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
-        TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+    TajoWorkerProtocol.TaskRequestProto.Builder builder =
+        TajoWorkerProtocol.TaskRequestProto.newBuilder();
     builder.setId(NULL_ATTEMPT_ID.getProto());
     builder.setShouldDie(true);
     builder.setOutputTable("");
@@ -191,8 +191,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       if (event instanceof FragmentScheduleEvent) {
         FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
         if (context.isLeafQuery()) {
-          QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
-          QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+          TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext();
+          Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++);
           task.addFragment(castEvent.getLeftFragment(), true);
           scheduledObjectNum++;
           if (castEvent.hasRightFragments()) {
@@ -216,8 +216,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       } else if (event instanceof FetchScheduleEvent) {
         FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
         Map<String, List<FetchImpl>> fetches = castEvent.getFetches();
-        QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext();
-        QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+        TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext();
+        Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++);
         scheduledObjectNum++;
         for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) {
           task.addFetches(eachFetch.getKey(), eachFetch.getValue());
@@ -230,8 +230,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask));
         }
         subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
-      } else if (event instanceof QueryUnitAttemptScheduleEvent) {
-        QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event;
+      } else if (event instanceof TaskAttemptToSchedulerEvent) {
+        TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
         if (context.isLeafQuery()) {
           scheduledRequests.addLeafTask(castEvent);
         } else {
@@ -240,12 +240,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       }
     } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
       // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler.
-      // This event is triggered by QueryUnitAttempt.
-      QueryUnitAttemptScheduleEvent castedEvent = (QueryUnitAttemptScheduleEvent) event;
-      scheduledRequests.leafTasks.remove(castedEvent.getQueryUnitAttempt().getId());
-      LOG.info(castedEvent.getQueryUnitAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
-      ((QueryUnitAttemptScheduleEvent) event).getQueryUnitAttempt().handle(
-          new TaskAttemptEvent(castedEvent.getQueryUnitAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
+      // This event is triggered by TaskAttempt.
+      TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event;
+      scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId());
+      LOG.info(castedEvent.getTaskAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
+      ((TaskAttemptToSchedulerEvent) event).getTaskAttempt().handle(
+          new TaskAttemptEvent(castedEvent.getTaskAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
     }
   }
 
@@ -337,8 +337,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     private final String host;
     private final String rack;
     /** A key is disk volume, and a value is a list of tasks to be scheduled. */
-    private Map<Integer, LinkedHashSet<QueryUnitAttempt>> unassignedTaskForEachVolume =
-        Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<QueryUnitAttempt>>());
+    private Map<Integer, LinkedHashSet<TaskAttempt>> unassignedTaskForEachVolume =
+        Collections.synchronizedMap(new HashMap<Integer, LinkedHashSet<TaskAttempt>>());
     /** A value is last assigned volume id for each task runner */
     private HashMap<TajoContainerId, Integer> lastAssignedVolumeId = new HashMap<TajoContainerId,
       Integer>();
@@ -360,11 +360,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       this.rack = rack;
     }
 
-    public synchronized void addQueryUnitAttempt(int volumeId, QueryUnitAttempt attemptId){
+    public synchronized void addTaskAttempt(int volumeId, TaskAttempt attemptId){
       synchronized (unassignedTaskForEachVolume){
-        LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+        LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
         if (list == null) {
-          list = new LinkedHashSet<QueryUnitAttempt>();
+          list = new LinkedHashSet<TaskAttempt>();
           unassignedTaskForEachVolume.put(volumeId, list);
         }
         list.add(attemptId);
@@ -381,9 +381,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
      *  2. unknown block or Non-splittable task in host
      *  3. remote tasks. unassignedTaskForEachVolume is only contained local task. so it will be null
      */
-    public synchronized QueryUnitAttemptId getLocalTask(TajoContainerId containerId) {
+    public synchronized TaskAttemptId getLocalTask(TajoContainerId containerId) {
       int volumeId;
-      QueryUnitAttemptId queryUnitAttemptId = null;
+      TaskAttemptId taskAttemptId = null;
 
       if (!lastAssignedVolumeId.containsKey(containerId)) {
         volumeId = getLowestVolumeId();
@@ -396,7 +396,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         int retry = unassignedTaskForEachVolume.size();
         do {
           //clean and get a remaining local task
-          queryUnitAttemptId = getAndRemove(volumeId);
+          taskAttemptId = getAndRemove(volumeId);
           if(!unassignedTaskForEachVolume.containsKey(volumeId)) {
             decreaseConcurrency(containerId);
             if (volumeId > REMOTE) {
@@ -404,7 +404,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
             }
           }
 
-          if (queryUnitAttemptId == null) {
+          if (taskAttemptId == null) {
             //reassign next volume
             volumeId = getLowestVolumeId();
             increaseConcurrency(containerId, volumeId);
@@ -416,19 +416,19 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       } else {
         this.remainTasksNum.set(0);
       }
-      return queryUnitAttemptId;
+      return taskAttemptId;
     }
 
-    public synchronized QueryUnitAttemptId getQueryUnitAttemptIdByRack(String rack) {
-      QueryUnitAttemptId queryUnitAttemptId = null;
+    public synchronized TaskAttemptId getTaskAttemptIdByRack(String rack) {
+      TaskAttemptId taskAttemptId = null;
 
       if (unassignedTaskForEachVolume.size() > 0 && this.rack.equals(rack)) {
         int retry = unassignedTaskForEachVolume.size();
         do {
           //clean and get a remaining task
           int volumeId = getLowestVolumeId();
-          queryUnitAttemptId = getAndRemove(volumeId);
-          if (queryUnitAttemptId == null) {
+          taskAttemptId = getAndRemove(volumeId);
+          if (taskAttemptId == null) {
             if (volumeId > REMOTE) {
               diskVolumeLoads.remove(volumeId);
             }
@@ -438,29 +438,29 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           }
         } while (retry > 0);
       }
-      return queryUnitAttemptId;
+      return taskAttemptId;
     }
 
-    private synchronized QueryUnitAttemptId getAndRemove(int volumeId){
-      QueryUnitAttemptId queryUnitAttemptId = null;
-      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return queryUnitAttemptId;
+    private synchronized TaskAttemptId getAndRemove(int volumeId){
+      TaskAttemptId taskAttemptId = null;
+      if(!unassignedTaskForEachVolume.containsKey(volumeId)) return taskAttemptId;
 
-      LinkedHashSet<QueryUnitAttempt> list = unassignedTaskForEachVolume.get(volumeId);
+      LinkedHashSet<TaskAttempt> list = unassignedTaskForEachVolume.get(volumeId);
       if(list != null && list.size() > 0){
-        QueryUnitAttempt queryUnitAttempt;
+        TaskAttempt taskAttempt;
         synchronized (unassignedTaskForEachVolume) {
-          Iterator<QueryUnitAttempt> iterator = list.iterator();
-          queryUnitAttempt = iterator.next();
+          Iterator<TaskAttempt> iterator = list.iterator();
+          taskAttempt = iterator.next();
           iterator.remove();
         }
 
         this.remainTasksNum.getAndDecrement();
-        queryUnitAttemptId = queryUnitAttempt.getId();
-        for (DataLocation location : queryUnitAttempt.getQueryUnit().getDataLocations()) {
+        taskAttemptId = taskAttempt.getId();
+        for (DataLocation location : taskAttempt.getTask().getDataLocations()) {
           if (!this.getHost().equals(location.getHost())) {
             HostVolumeMapping volumeMapping = scheduledRequests.leafTaskHostMapping.get(location.getHost());
             if (volumeMapping != null) {
-              volumeMapping.removeQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+              volumeMapping.removeTaskAttempt(location.getVolumeId(), taskAttempt);
             }
           }
         }
@@ -469,16 +469,16 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       if(list == null || list.isEmpty()) {
         unassignedTaskForEachVolume.remove(volumeId);
       }
-      return queryUnitAttemptId;
+      return taskAttemptId;
     }
 
-    private synchronized void removeQueryUnitAttempt(int volumeId, QueryUnitAttempt queryUnitAttempt){
+    private synchronized void removeTaskAttempt(int volumeId, TaskAttempt taskAttempt){
       if(!unassignedTaskForEachVolume.containsKey(volumeId)) return;
 
-      LinkedHashSet<QueryUnitAttempt> tasks  = unassignedTaskForEachVolume.get(volumeId);
+      LinkedHashSet<TaskAttempt> tasks  = unassignedTaskForEachVolume.get(volumeId);
 
       if(tasks != null && tasks.size() > 0){
-        tasks.remove(queryUnitAttempt);
+        tasks.remove(taskAttempt);
         remainTasksNum.getAndDecrement();
       } else {
         unassignedTaskForEachVolume.remove(volumeId);
@@ -596,14 +596,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
     // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
     // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
     // if the task is not included in leafTasks and nonLeafTasks.
-    private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
-    private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
+    private final Set<TaskAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
+    private final Set<TaskAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<TaskAttemptId>());
     private Map<String, HostVolumeMapping> leafTaskHostMapping = Maps.newConcurrentMap();
-    private final Map<String, HashSet<QueryUnitAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap();
+    private final Map<String, HashSet<TaskAttemptId>> leafTasksRackMapping = Maps.newConcurrentMap();
 
-    private synchronized void addLeafTask(QueryUnitAttemptScheduleEvent event) {
-      QueryUnitAttempt queryUnitAttempt = event.getQueryUnitAttempt();
-      List<DataLocation> locations = queryUnitAttempt.getQueryUnit().getDataLocations();
+    private synchronized void addLeafTask(TaskAttemptToSchedulerEvent event) {
+      TaskAttempt taskAttempt = event.getTaskAttempt();
+      List<DataLocation> locations = taskAttempt.getTask().getDataLocations();
 
       for (DataLocation location : locations) {
         String host = location.getHost();
@@ -614,30 +614,30 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           hostVolumeMapping = new HostVolumeMapping(host, rack);
           leafTaskHostMapping.put(host, hostVolumeMapping);
         }
-        hostVolumeMapping.addQueryUnitAttempt(location.getVolumeId(), queryUnitAttempt);
+        hostVolumeMapping.addTaskAttempt(location.getVolumeId(), taskAttempt);
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Added attempt req to host " + host);
         }
 
-        HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
+        HashSet<TaskAttemptId> list = leafTasksRackMapping.get(hostVolumeMapping.getRack());
         if (list == null) {
-          list = new HashSet<QueryUnitAttemptId>();
+          list = new HashSet<TaskAttemptId>();
           leafTasksRackMapping.put(hostVolumeMapping.getRack(), list);
         }
 
-        list.add(queryUnitAttempt.getId());
+        list.add(taskAttempt.getId());
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Added attempt req to rack " + hostVolumeMapping.getRack());
         }
       }
 
-      leafTasks.add(queryUnitAttempt.getId());
+      leafTasks.add(taskAttempt.getId());
     }
 
-    private void addNonLeafTask(QueryUnitAttemptScheduleEvent event) {
-      nonLeafTasks.add(event.getQueryUnitAttempt().getId());
+    private void addNonLeafTask(TaskAttemptToSchedulerEvent event) {
+      nonLeafTasks.add(event.getTaskAttempt().getId());
     }
 
     public int leafTaskNum() {
@@ -648,14 +648,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       return nonLeafTasks.size();
     }
 
-    public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
+    public Set<TaskAttemptId> assignedRequest = new HashSet<TaskAttemptId>();
 
-    private QueryUnitAttemptId allocateLocalTask(String host, TajoContainerId containerId){
+    private TaskAttemptId allocateLocalTask(String host, TajoContainerId containerId){
       HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
 
       if (hostVolumeMapping != null) { //tajo host is located in hadoop datanode
         for (int i = 0; i < hostVolumeMapping.getRemainingLocalTaskSize(); i++) {
-          QueryUnitAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
+          TaskAttemptId attemptId = hostVolumeMapping.getLocalTask(containerId);
 
           if(attemptId == null) break;
           //find remaining local task
@@ -671,11 +671,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
       return null;
     }
 
-    private QueryUnitAttemptId allocateRackTask(String host) {
+    private TaskAttemptId allocateRackTask(String host) {
 
       List<HostVolumeMapping> remainingTasks = Lists.newArrayList(leafTaskHostMapping.values());
       String rack = RackResolver.resolve(host).getNetworkLocation();
-      QueryUnitAttemptId attemptId = null;
+      TaskAttemptId attemptId = null;
 
       if (remainingTasks.size() > 0) {
         synchronized (scheduledRequests) {
@@ -697,7 +697,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
         for (HostVolumeMapping tasks : remainingTasks) {
           for (int i = 0; i < tasks.getRemainingLocalTaskSize(); i++) {
-            QueryUnitAttemptId tId = tasks.getQueryUnitAttemptIdByRack(rack);
+            TaskAttemptId tId = tasks.getTaskAttemptIdByRack(rack);
 
             if (tId == null) break;
 
@@ -713,12 +713,12 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
 
       //find task in rack
       if (attemptId == null) {
-        HashSet<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
+        HashSet<TaskAttemptId> list = leafTasksRackMapping.get(rack);
         if (list != null) {
           synchronized (list) {
-            Iterator<QueryUnitAttemptId> iterator = list.iterator();
+            Iterator<TaskAttemptId> iterator = list.iterator();
             while (iterator.hasNext()) {
-              QueryUnitAttemptId tId = iterator.next();
+              TaskAttemptId tId = iterator.next();
               iterator.remove();
               if (leafTasks.contains(tId)) {
                 leafTasks.remove(tId);
@@ -788,7 +788,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         //////////////////////////////////////////////////////////////////////
         // disk or host-local allocation
         //////////////////////////////////////////////////////////////////////
-        QueryUnitAttemptId attemptId = allocateLocalTask(host, containerId);
+        TaskAttemptId attemptId = allocateLocalTask(host, containerId);
 
         if (attemptId == null) { // if a local task cannot be found
           HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
@@ -832,8 +832,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         }
 
         if (attemptId != null) {
-          QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
-          QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+          Task task = subQuery.getTask(attemptId.getTaskId());
+          TaskRequest taskAssign = new TaskRequestImpl(
               attemptId,
               new ArrayList<FragmentProto>(task.getAllFragments()),
               "",
@@ -878,7 +878,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
         taskRequest = taskRequests.pollFirst();
         LOG.debug("assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
 
-        QueryUnitAttemptId attemptId;
+        TaskAttemptId attemptId;
         // random allocation
         if (nonLeafTasks.size() > 0) {
           synchronized (nonLeafTasks){
@@ -887,9 +887,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           }
           LOG.debug("Assigned based on * match");
 
-          QueryUnit task;
-          task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
-          QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+          Task task;
+          task = subQuery.getTask(attemptId.getTaskId());
+          TaskRequest taskAssign = new TaskRequestImpl(
               attemptId,
               Lists.newArrayList(task.getAllFragments()),
               "",

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index cc99453..0ab19db 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -24,19 +24,19 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.query.QueryUnitRequest;
-import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.engine.query.TaskRequest;
+import org.apache.tajo.engine.query.TaskRequestImpl;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
+import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryUnit;
-import org.apache.tajo.master.querymaster.QueryUnitAttempt;
+import org.apache.tajo.master.querymaster.Task;
+import org.apache.tajo.master.querymaster.TaskAttempt;
 import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.storage.StorageManager;
@@ -126,14 +126,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
     super.start();
   }
 
-  private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
-  public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+  private static final TaskAttemptId NULL_ATTEMPT_ID;
+  public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq;
   static {
     ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
-    NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0);
 
-    TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
-        TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+    TajoWorkerProtocol.TaskRequestProto.Builder builder =
+        TajoWorkerProtocol.TaskRequestProto.newBuilder();
     builder.setId(NULL_ATTEMPT_ID.getProto());
     builder.setShouldDie(true);
     builder.setOutputTable("");
@@ -214,9 +214,9 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       } else if (event instanceof FetchScheduleEvent) {
         FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
         scheduledFetches.addFetch(castEvent.getFetches());
-      } else if (event instanceof QueryUnitAttemptScheduleEvent) {
-        QueryUnitAttemptScheduleEvent castEvent = (QueryUnitAttemptScheduleEvent) event;
-        assignTask(castEvent.getContext(), castEvent.getQueryUnitAttempt());
+      } else if (event instanceof TaskAttemptToSchedulerEvent) {
+        TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event;
+        assignTask(castEvent.getContext(), castEvent.getTaskAttempt());
       }
     }
   }
@@ -360,9 +360,9 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
       }
 
       String host = container.getTaskHostName();
-      QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
+      TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(container.containerID,
           host, taskRequest.getCallback());
-      QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+      Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++);
 
       FragmentPair fragmentPair;
       List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
@@ -467,23 +467,23 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
         LOG.debug("Assigned based on * match");
         ContainerProxy container = context.getMasterContext().getResourceAllocator().getContainer(
             taskRequest.getContainerId());
-        QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
+        TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(container.containerID,
             container.getTaskHostName(), taskRequest.getCallback());
-        QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);
+        Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++);
         task.setFragment(scheduledFragments.getAllFragments());
         subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
       }
     }
   }
 
-  private void assignTask(QueryUnitAttemptScheduleContext attemptContext, QueryUnitAttempt taskAttempt) {
-    QueryUnitAttemptId attemptId = taskAttempt.getId();
-    QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
+  private void assignTask(TaskAttemptScheduleContext attemptContext, TaskAttempt taskAttempt) {
+    TaskAttemptId attemptId = taskAttempt.getId();
+    TaskRequest taskAssign = new TaskRequestImpl(
         attemptId,
-        new ArrayList<FragmentProto>(taskAttempt.getQueryUnit().getAllFragments()),
+        new ArrayList<FragmentProto>(taskAttempt.getTask().getAllFragments()),
         "",
         false,
-        taskAttempt.getQueryUnit().getLogicalPlan().toJson(),
+        taskAttempt.getTask().getLogicalPlan().toJson(),
         context.getMasterContext().getQueryContext(),
         subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
     if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
index 64081f3..aced80c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -21,8 +21,8 @@ package org.apache.tajo.master;
 import com.google.protobuf.ByteString;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.TaskId;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.conf.TajoConf;
@@ -84,7 +84,7 @@ public class NonForwardQueryResultScanner {
       FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{}));
       this.taskContext = new TaskAttemptContext(
           new QueryContext(tajoConf), null,
-          new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0),
+          new TaskAttemptId(new TaskId(new ExecutionBlockId(queryId, 1), 0), 0),
           fragmentProtos, null);
 
       try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index 0d2acf7..4649d99 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ContainerProtocol;
@@ -75,7 +75,7 @@ public class TajoContainerProxy extends ContainerProxy {
    *
    * @param taskAttemptId The TaskAttemptId to be killed.
    */
-  public void killTaskAttempt(QueryUnitAttemptId taskAttemptId) {
+  public void killTaskAttempt(TaskAttemptId taskAttemptId) {
     NettyClientBase tajoWorkerRpc = null;
     try {
       InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
index 4d10efe..183aeb5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/ContainerAllocatorEventType.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 public enum ContainerAllocatorEventType {
-  // producer: QueryUnitAttempt, consumer: ContainerAllocator
+  // producer: TaskAttempt, consumer: ContainerAllocator
   CONTAINER_REQ,
   CONTAINER_DEALLOCATE,
   CONTAINER_FAILED

http://git-wip-us.apache.org/repos/asf/tajo/blob/5c852b79/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
index cab2202..5cf9887 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
@@ -19,24 +19,24 @@
 package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.master.container.TajoContainerId;
 
 /**
  * This event is sent to a running TaskAttempt on a worker.
  */
 public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> {
-  private final QueryUnitAttemptId taskAttemptId;
+  private final TaskAttemptId taskAttemptId;
   private final TajoContainerId containerId;
 
-  public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, TajoContainerId containerId,
+  public LocalTaskEvent(TaskAttemptId taskAttemptId, TajoContainerId containerId,
                         LocalTaskEventType eventType) {
     super(eventType);
     this.taskAttemptId = taskAttemptId;
     this.containerId = containerId;
   }
 
-  public QueryUnitAttemptId getTaskAttemptId() {
+  public TaskAttemptId getTaskAttemptId() {
     return taskAttemptId;
   }
 


Mime
View raw message