tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/2] git commit: TAJO-305: Implement killQuery feature. (hyunsik)
Date Mon, 17 Feb 2014 09:44:40 GMT
TAJO-305: Implement killQuery feature. (hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/ae541ffa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/ae541ffa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/ae541ffa

Branch: refs/heads/master
Commit: ae541ffae406faa53ace5ad19cb503b96c39c549
Parents: e2a7dff
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Mon Feb 17 18:44:19 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Mon Feb 17 18:44:19 2014 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../main/java/org/apache/tajo/cli/TajoCli.java  |  32 +-
 .../java/org/apache/tajo/client/TajoAdmin.java  |  28 +-
 .../java/org/apache/tajo/client/TajoClient.java |   9 +-
 tajo-common/src/main/proto/tajo_protos.proto    |   8 +-
 .../tajo/engine/function/builtin/Sleep.java     |  52 +++
 .../SortBasedColPartitionStoreExec.java         |   8 +-
 .../tajo/master/AbstractTaskScheduler.java      |   3 +-
 .../tajo/master/DefaultTaskScheduler.java       |  11 +
 .../apache/tajo/master/TajoContainerProxy.java  |  20 ++
 .../tajo/master/TajoMasterClientService.java    |  13 +-
 .../tajo/master/event/LocalTaskEvent.java       |  45 +++
 .../tajo/master/event/LocalTaskEventType.java   |  23 ++
 .../tajo/master/event/QueryCompletedEvent.java  |  42 +++
 .../tajo/master/event/QueryEventType.java       |  13 +-
 .../tajo/master/event/QueryFinishEvent.java     |  39 ---
 .../event/QueryMasterQueryCompletedEvent.java   |  39 +++
 .../master/event/SubQueryCompletedEvent.java    |   2 +-
 .../tajo/master/event/SubQueryEventType.java    |   2 +
 .../tajo/master/event/SubQuerySucceeEvent.java  |  30 --
 .../tajo/master/event/SubQueryTaskEvent.java    |  12 +-
 .../tajo/master/event/TaskAttemptEventType.java |   2 +
 .../tajo/master/event/TaskSchedulerEvent.java   |   2 +-
 .../apache/tajo/master/querymaster/Query.java   | 231 ++++++++-----
 .../master/querymaster/QueryInProgress.java     |   6 +
 .../tajo/master/querymaster/QueryJobEvent.java  |   3 +-
 .../querymaster/QueryMasterManagerService.java  |  42 ++-
 .../master/querymaster/QueryMasterTask.java     |  46 ++-
 .../tajo/master/querymaster/QueryUnit.java      | 148 +++++++--
 .../master/querymaster/QueryUnitAttempt.java    | 107 +++++-
 .../tajo/master/querymaster/SubQuery.java       | 327 +++++++++++++------
 .../tajo/master/querymaster/SubQueryState.java  |   5 +-
 .../tajo/worker/TajoResourceAllocator.java      |  26 +-
 .../tajo/worker/TajoWorkerClientService.java    |  48 ++-
 .../tajo/worker/TajoWorkerManagerService.java   |   8 +
 .../main/java/org/apache/tajo/worker/Task.java  |  38 +--
 .../java/org/apache/tajo/worker/TaskRunner.java |   3 +-
 .../src/main/proto/QueryMasterProtocol.proto    |   1 +
 .../src/main/proto/TajoWorkerProtocol.proto     |   4 +-
 .../src/main/resources/webapps/admin/query.jsp  |   3 +-
 .../resources/webapps/worker/querydetail.jsp    |   4 +-
 .../org/apache/tajo/client/TestTajoClient.java  |  15 +-
 42 files changed, 1053 insertions(+), 449 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9e21181..879d816 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -130,6 +130,8 @@ Release 0.8.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-305: Implement killQuery feature. (hyunsik)
+
     TAJO-598: Refactoring Tajo RPC. (jinho)
 
     TAJO-592: HCatalogStore should supports RCFile and default hive field delimiter. (jaehwa)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
index 57a7294..f107c51 100644
--- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
+++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java
@@ -286,7 +286,22 @@ public class TajoCli {
       } else if (cmds[0].equalsIgnoreCase("detach") && cmds.length > 1 && cmds[1].equalsIgnoreCase("table")) {
         // this command should be moved to GlobalEngine
         invokeCommand(cmds);
-
+      } else if (cmds[0].equalsIgnoreCase("explain") && cmds.length > 1) {
+        String sql = stripped.substring(8);
+        ClientProtos.ExplainQueryResponse response = client.explainQuery(sql);
+        if (response == null) {
+          sout.println("response is null");
+        } else {
+          if (response.hasExplain()) {
+            sout.println(response.getExplain());
+          } else {
+            if (response.hasErrorMessage()) {
+              sout.println(response.getErrorMessage());
+            } else {
+              sout.println("No Explain");
+            }
+          }
+        }
       } else { // submit a query to TajoMaster
         ClientProtos.GetQueryStatusResponse response = client.executeQuery(stripped);
         if (response == null) {
@@ -299,7 +314,7 @@ public class TajoCli {
             if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
               sout.println("OK");
             } else {
-              getQueryResult(queryId);
+              waitForQueryCompleted(queryId);
             }
           } finally {
             if(queryId != null) {
@@ -316,11 +331,7 @@ public class TajoCli {
     return 0;
   }
 
-  private boolean isFailed(QueryState state) {
-    return state == QueryState.QUERY_ERROR || state == QueryState.QUERY_FAILED;
-  }
-
-  private void getQueryResult(QueryId queryId) {
+  private void waitForQueryCompleted(QueryId queryId) {
     // if query is empty string
     if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
       return;
@@ -341,14 +352,15 @@ public class TajoCli {
           continue;
         }
 
-        if (status.getState() == QueryState.QUERY_RUNNING ||
-            status.getState() == QueryState.QUERY_SUCCEEDED) {
+        if (status.getState() == QueryState.QUERY_RUNNING || status.getState() == QueryState.QUERY_SUCCEEDED) {
           sout.println("Progress: " + (int)(status.getProgress() * 100.0f)
               + "%, response time: " + ((float)(status.getFinishTime() - status.getSubmitTime()) / 1000.0) + " sec");
           sout.flush();
         }
 
-        if (status.getState() != QueryState.QUERY_RUNNING && status.getState() != QueryState.QUERY_NOT_ASSIGNED) {
+        if (status.getState() != QueryState.QUERY_RUNNING &&
+            status.getState() != QueryState.QUERY_NOT_ASSIGNED &&
+            status.getState() != QueryState.QUERY_KILL_WAIT) {
           break;
         } else {
           Thread.sleep(Math.min(200 * progressRetries, 1000));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
index 98939a5..e6fe88f 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoAdmin.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
 import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
+import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.io.PrintWriter;
@@ -58,9 +59,10 @@ public class TajoAdmin {
     options = new Options();
     options.addOption("h", "host", true, "Tajo server host");
     options.addOption("p", "port", true, "Tajo server port");
-    options.addOption("list", "list", false, "Show Tajo query list");
-    options.addOption("cluster", "cluster", false, "Show Cluster Info");
-    options.addOption("desc", "desc", false, "Show Query Description");
+    options.addOption("list", null, false, "Show Tajo query list");
+    options.addOption("cluster", null, false, "Show Cluster Info");
+    options.addOption("desc", null, false, "Show Query Description");
+    options.addOption("kill", null, true, "Kill a running query");
   }
 
   private static void printUsage() {
@@ -98,12 +100,17 @@ public class TajoAdmin {
       port = Integer.parseInt(cmd.getOptionValue("p"));
     }
 
+    String queryId = null;
+
     if (cmd.hasOption("list")) {
       cmdType = 1;
     } else if (cmd.hasOption("desc")) {
       cmdType = 2;
     } else if (cmd.hasOption("cluster")) {
       cmdType = 3;
+    } else if (cmd.hasOption("kill")) {
+      cmdType = 4;
+      queryId = cmd.getOptionValue("kill");
     }
 
     // if there is no "-h" option,
@@ -149,6 +156,9 @@ public class TajoAdmin {
       case 3:
         processCluster(writer, client);
         break;
+      case 4:
+        processKill(writer, client, queryId);
+        break;
       default:
         printUsage();
         break;
@@ -375,7 +385,7 @@ public class TajoAdmin {
     writer.write(line);
 
     for (BriefQueryInfo queryInfo : queryList) {
-        String queryId = String.format("q-%s-%04d",
+        String queryId = String.format("q_%s_%04d",
                                        queryInfo.getQueryId().getId(),
                                        queryInfo.getQueryId().getSeq());
         String state = getQueryState(queryInfo.getState());
@@ -386,4 +396,14 @@ public class TajoAdmin {
         writer.write(line);
     }
   }
+
+  public static void processKill(Writer writer, TajoClient client, String queryIdStr)
+      throws IOException, ServiceException {
+    boolean killedSuccessfully = client.killQuery(TajoIdUtils.parseQueryId(queryIdStr));
+    if (killedSuccessfully) {
+      writer.write(queryIdStr + " is killed successfully.\n");
+    } else {
+      writer.write("killing query is failed.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 3aeb40e..d9c511e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -476,25 +476,22 @@ public class TajoClient {
 
       long currentTimeMillis = System.currentTimeMillis();
       long timeKillIssued = currentTimeMillis;
-      while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
-          != QueryState.QUERY_KILLED)) {
+      while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() != QueryState.QUERY_KILLED)) {
         try {
-          Thread.sleep(1000L);
+          Thread.sleep(100L);
         } catch(InterruptedException ie) {
-          /** interrupted, just break */
           break;
         }
         currentTimeMillis = System.currentTimeMillis();
         status = getQueryStatus(queryId);
       }
+      return status.getState() == QueryState.QUERY_KILLED;
     } catch(Exception e) {
       LOG.debug("Error when checking for application status", e);
       return false;
     } finally {
       connPool.releaseConnection(tmClient);
     }
-
-    return true;
   }
 
   public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto
index d337315..0abc266 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -30,8 +30,9 @@ enum QueryState {
   QUERY_SUCCEEDED = 5;
   QUERY_FAILED = 6;
   QUERY_KILLED = 7;
-  QUERY_ERROR = 8;
-  QUERY_NOT_ASSIGNED = 9;
+  QUERY_KILL_WAIT = 8;
+  QUERY_ERROR = 9;
+  QUERY_NOT_ASSIGNED = 10;
 }
 
 enum TaskAttemptState {
@@ -42,5 +43,6 @@ enum TaskAttemptState {
   TA_RUNNING = 4;
   TA_SUCCEEDED = 5;
   TA_FAILED = 6;
-  TA_KILLED = 7;
+  TA_KILL_WAIT = 7;
+  TA_KILLED = 8;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java
new file mode 100644
index 0000000..0ae8386
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/function/builtin/Sleep.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.function.builtin;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.function.GeneralFunction;
+import org.apache.tajo.engine.function.annotation.Description;
+import org.apache.tajo.engine.function.annotation.ParamTypes;
+import org.apache.tajo.storage.Tuple;
+
+@Description(
+  functionName = "sleep",
+  description = "sleep for seconds",
+  example = "> SELECT sleep(1) from table1;",
+  returnType = TajoDataTypes.Type.INT4,
+  paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT4})}
+)
+public class Sleep extends GeneralFunction {
+
+  public Sleep() {
+    super(NoArgs);
+  }
+
+  @Override
+  public Datum eval(Tuple params) {
+    try {
+      Thread.sleep(params.getInt4(0) * 1000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    return DatumFactory.createInt4(params.getInt4(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index 99d673b..8c55d7f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -135,9 +135,11 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
 
   @Override
   public void close() throws IOException {
-    appender.close();
-    StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
-    context.setResultStats(aggregated);
+    if (appender != null) {
+      appender.close();
+      StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
+      context.setResultStats(aggregated);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
index acb1dcc..6c187b6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/AbstractTaskScheduler.java
@@ -24,8 +24,7 @@ import org.apache.tajo.master.event.TaskRequestEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent;
 
 
-public abstract class AbstractTaskScheduler extends AbstractService
-    implements EventHandler<TaskSchedulerEvent> {
+public abstract class AbstractTaskScheduler extends AbstractService implements EventHandler<TaskSchedulerEvent> {
 
   /**
    * Construct the service.

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 4260c98..cd18e10 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -222,6 +222,14 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
           scheduledRequests.addNonLeafTask(castEvent);
         }
       }
+    } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) {
+      // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler.
+      // This event is triggered by QueryUnitAttempt.
+      QueryUnitAttemptScheduleEvent castedEvent = (QueryUnitAttemptScheduleEvent) event;
+      scheduledRequests.leafTasks.remove(castedEvent.getQueryUnitAttempt().getId());
+      LOG.info(castedEvent.getQueryUnitAttempt().getId() + " is canceled from " + this.getClass().getSimpleName());
+      ((QueryUnitAttemptScheduleEvent) event).getQueryUnitAttempt().handle(
+          new TaskAttemptEvent(castedEvent.getQueryUnitAttempt().getId(), TaskAttemptEventType.TA_SCHEDULE_CANCELED));
     }
   }
 
@@ -360,6 +368,9 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
   }
 
   private class ScheduledRequests {
+    // two list leafTasks and nonLeafTasks keep all tasks to be scheduled. Even though some task is included in
+    // leafTaskHostMapping or leafTasksRackMapping, some task T will not be sent to a task runner
+    // if the task is not included in leafTasks and nonLeafTasks.
     private final Set<QueryUnitAttemptId> leafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
     private final Set<QueryUnitAttemptId> nonLeafTasks = Collections.synchronizedSet(new HashSet<QueryUnitAttemptId>());
     private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
index f405ea7..e44947e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoContainerProxy.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
@@ -60,6 +61,25 @@ public class TajoContainerProxy extends ContainerProxy {
     assignExecutionBlock(executionBlockId, container);
   }
 
+  /**
+   * It sends a kill RPC request to a corresponding worker.
+   *
+   * @param taskAttemptId The TaskAttemptId to be killed.
+   */
+  public void killTaskAttempt(QueryUnitAttemptId taskAttemptId) {
+    NettyClientBase tajoWorkerRpc = null;
+    try {
+      InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort());
+      tajoWorkerRpc = RpcConnectionPool.getPool(context.getConf()).getConnection(addr, TajoWorkerProtocol.class, true);
+      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
+      tajoWorkerRpcClient.killTaskAttempt(null, taskAttemptId.getProto(), NullCallback.get());
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      RpcConnectionPool.getPool(context.getConf()).releaseConnection(tajoWorkerRpc);
+    }
+  }
+
   private void assignExecutionBlock(ExecutionBlockId executionBlockId, Container container) {
     NettyClientBase tajoWorkerRpc = null;
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 0f8eb61..856566a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -43,6 +43,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolServ
 import org.apache.tajo.master.TajoMaster.MasterContext;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 import org.apache.tajo.master.querymaster.QueryInfo;
+import org.apache.tajo.master.querymaster.QueryJobEvent;
 import org.apache.tajo.master.querymaster.QueryJobManager;
 import org.apache.tajo.master.rm.WorkerResource;
 import org.apache.tajo.rpc.BlockingRpcServer;
@@ -296,15 +297,15 @@ public class TajoMasterClientService extends AbstractService {
       return builder.build();
     }
 
+    /**
+     * It is invoked by TajoContainerProxy.
+     */
     @Override
-    public BoolProto killQuery(RpcController controller,
-                               TajoIdProtos.QueryIdProto request)
-        throws ServiceException {
+    public BoolProto killQuery(RpcController controller, TajoIdProtos.QueryIdProto request) throws ServiceException {
       QueryId queryId = new QueryId(request);
       QueryJobManager queryJobManager = context.getQueryJobManager();
-      //TODO KHJ, change QueryJobManager to event handler
-      //queryJobManager.handle(new QueryEvent(queryId, QueryEventType.KILL));
-
+      queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
+          new QueryInfo(queryId)));
       return BOOL_TRUE;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
new file mode 100644
index 0000000..92e6695
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEvent.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryUnitAttemptId;
+
+/**
+ * This event is sent to a running TaskAttempt on a worker.
+ */
+public class LocalTaskEvent extends AbstractEvent<LocalTaskEventType> {
+  private final QueryUnitAttemptId taskAttemptId;
+  private final ContainerId containerId;
+
+  public LocalTaskEvent(QueryUnitAttemptId taskAttemptId, ContainerId containerId, LocalTaskEventType eventType) {
+    super(eventType);
+    this.taskAttemptId = taskAttemptId;
+    this.containerId = containerId;
+  }
+
+  public QueryUnitAttemptId getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java
new file mode 100644
index 0000000..00b548e
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/LocalTaskEventType.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+public enum LocalTaskEventType {
+  KILL
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
new file mode 100644
index 0000000..dc75a1d
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.master.querymaster.SubQueryState;
+
+public class QueryCompletedEvent extends QueryEvent {
+  private final ExecutionBlockId executionBlockId;
+  private final SubQueryState finalState;
+
+  public QueryCompletedEvent(final ExecutionBlockId executionBlockId,
+                             SubQueryState finalState) {
+    super(executionBlockId.getQueryId(), QueryEventType.QUERY_COMPLETED);
+    this.executionBlockId = executionBlockId;
+    this.finalState = finalState;
+  }
+
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
+  public SubQueryState getState() {
+    return finalState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
index d5f7e38..edc0cd8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryEventType.java
@@ -19,9 +19,18 @@
 package org.apache.tajo.master.event;
 
 public enum QueryEventType {
+
+  // Producer: TajoMaster
   START,
-  INTERNAL_ERROR,
-  SUBQUERY_COMPLETED,
   KILL,
+
+  // Producer: SubQuery
+  SUBQUERY_COMPLETED,
+
+  // Producer: Query
+  QUERY_COMPLETED,
+
+  // Producer: Any component
   DIAGNOSTIC_UPDATE,
+  INTERNAL_ERROR,
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java
deleted file mode 100644
index 9c81132..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryFinishEvent.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.QueryId;
-
-public class QueryFinishEvent extends AbstractEvent {
-  public enum EventType {
-    QUERY_FINISH
-  }
-
-  private final QueryId queryId;
-
-  public QueryFinishEvent(QueryId queryId) {
-    super(EventType.QUERY_FINISH);
-    this.queryId = queryId;
-  }
-
-  public QueryId getQueryId() {
-    return this.queryId;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java
new file mode 100644
index 0000000..bc7e0f4
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryMasterQueryCompletedEvent.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryId;
+
+public class QueryMasterQueryCompletedEvent extends AbstractEvent<QueryMasterQueryCompletedEvent.EventType> {
+  public enum EventType {
+    QUERY_FINISH
+  }
+
+  private final QueryId queryId;
+
+  public QueryMasterQueryCompletedEvent(QueryId queryId) {
+    super(EventType.QUERY_FINISH);
+    this.queryId = queryId;
+  }
+
+  public QueryId getQueryId() {
+    return this.queryId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
index 7e07525..6389798 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
@@ -36,7 +36,7 @@ public class SubQueryCompletedEvent extends QueryEvent {
     return executionBlockId;
   }
 
-  public SubQueryState getFinalState() {
+  public SubQueryState getState() {
     return finalState;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
index 2e56c79..8003ef3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java
@@ -27,6 +27,8 @@ public enum SubQueryEventType {
   SQ_INIT,
   SQ_START,
   SQ_CONTAINER_ALLOCATED,
+  SQ_KILL,
+  SQ_LAUNCH,
 
   // Producer: QueryUnit
   SQ_TASK_COMPLETED,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
deleted file mode 100644
index 2485421..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master.event;
-
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.master.querymaster.SubQueryState;
-
-public class SubQuerySucceeEvent extends SubQueryCompletedEvent {
-  public SubQuerySucceeEvent(final ExecutionBlockId id) {
-    super(id, SubQueryState.SUCCEEDED);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
index 0217f20..0502534 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
@@ -19,19 +19,25 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.master.TaskState;
 
 /**
  * Event Class: From Task to SubQuery
  */
 public class SubQueryTaskEvent extends SubQueryEvent {
   private QueryUnitId taskId;
-  public SubQueryTaskEvent(QueryUnitId taskId,
-                           SubQueryEventType subQueryEventType) {
-    super(taskId.getExecutionBlockId(), subQueryEventType);
+  private TaskState state;
+  public SubQueryTaskEvent(QueryUnitId taskId, TaskState state) {
+    super(taskId.getExecutionBlockId(), SubQueryEventType.SQ_TASK_COMPLETED);
     this.taskId = taskId;
+    this.state = state;
   }
 
   public QueryUnitId getTaskId() {
     return this.taskId;
   }
+
+  public TaskState getState() {
+    return state;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
index d9d2f13..e35b154 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptEventType.java
@@ -29,9 +29,11 @@ public enum TaskAttemptEventType {
 
   //Producer:Client, Task
   TA_KILL,
+  TA_LOCAL_KILLED,
 
   //Producer:Scheduler
   TA_ASSIGNED,
+  TA_SCHEDULE_CANCELED,
 
   //Producer:Scheduler
   TA_LAUNCHED,

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
index 9fe2f8c..383845f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
@@ -25,7 +25,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 public abstract class TaskSchedulerEvent extends AbstractEvent<EventType> {
   public enum EventType {
     T_SCHEDULE,
-    T_SUBQUERY_COMPLETED
+    T_SCHEDULE_CANCEL
   }
 
   protected final ExecutionBlockId executionBlockId;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index f4a6da7..6a4eb4b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -76,6 +76,10 @@ public class Query implements EventHandler<QueryEvent> {
   private long finishTime;
   private TableDesc resultDesc;
   private int completedSubQueryCount = 0;
+  private int successedSubQueryCount = 0;
+  private int killedSubQueryCount = 0;
+  private int failedSubQueryCount = 0;
+  private int erroredSubQueryCount = 0;
   private final List<String> diagnostics = new ArrayList<String>();
 
   // Internal Variables
@@ -89,6 +93,8 @@ public class Query implements EventHandler<QueryEvent> {
   // Transition Handler
   private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
   private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
+  private static final SubQueryCompletedTransition SUBQUERY_COMPLETED_TRANSITION = new SubQueryCompletedTransition();
+  private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition();
 
   protected static final StateMachineFactory
       <Query,QueryState,QueryEventType,QueryEvent> stateMachineFactory =
@@ -102,23 +108,51 @@ public class Query implements EventHandler<QueryEvent> {
           .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_NEW,
               QueryEventType.DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_KILLED,
+              QueryEventType.KILL,
+              new KillNewQueryTransition())
           .addTransition(QueryState.QUERY_NEW, QueryState.QUERY_ERROR,
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
           // Transitions from RUNNING state
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
+              QueryEventType.SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
           .addTransition(QueryState.QUERY_RUNNING,
-              EnumSet.of(QueryState.QUERY_RUNNING, QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED,
+              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
                   QueryState.QUERY_ERROR),
-              QueryEventType.SUBQUERY_COMPLETED,
-              new SubQueryCompletedTransition())
+              QueryEventType.QUERY_COMPLETED,
+              QUERY_COMPLETED_TRANSITION)
           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING,
               QueryEventType.DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.KILL,
+              new KillSubQueriesTransition())
           .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR,
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
+          // Transitions from KILL_WAIT state
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT,
+              EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED,
+                  QueryState.QUERY_ERROR),
+              QueryEventType.QUERY_COMPLETED,
+              QUERY_COMPLETED_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              QueryEventType.DIAGNOSTIC_UPDATE,
+              DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_ERROR,
+              QueryEventType.INTERNAL_ERROR,
+              INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT,
+              EnumSet.of(QueryEventType.KILL))
+
           // Transitions from FAILED state
           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
               QueryEventType.DIAGNOSTIC_UPDATE,
@@ -126,6 +160,9 @@ public class Query implements EventHandler<QueryEvent> {
           .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_ERROR,
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_FAILED, QueryState.QUERY_FAILED,
+              QueryEventType.KILL)
 
           // Transitions from ERROR state
           .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
@@ -134,6 +171,9 @@ public class Query implements EventHandler<QueryEvent> {
           .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
               QueryEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR,
+              QueryEventType.KILL)
 
           .installTopology();
 
@@ -294,78 +334,40 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  public static class SubQueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
+  public static class QueryCompletedTransition implements MultipleArcTransition<Query, QueryEvent, QueryState> {
 
-    private boolean hasNext(Query query) {
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      ExecutionBlock nextBlock = cursor.peek();
-      return !query.getPlan().isTerminal(nextBlock);
-    }
-
-    private QueryState executeNextBlock(Query query) {
-      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
-      ExecutionBlock nextBlock = cursor.nextBlock();
-      SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
-      nextSubQuery.setPriority(query.priority--);
-      query.addSubQuery(nextSubQuery);
-      nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
-
-      LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
-        LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+    @Override
+    public QueryState transition(Query query, QueryEvent queryEvent) {
+      QueryCompletedEvent subQueryEvent = (QueryCompletedEvent) queryEvent;
+      QueryState finalState;
+      if (subQueryEvent.getState() == SubQueryState.SUCCEEDED) {
+        finalizeQuery(query, subQueryEvent);
+        finalState = QueryState.QUERY_SUCCEEDED;
+      } else if (subQueryEvent.getState() == SubQueryState.FAILED) {
+        finalState = QueryState.QUERY_FAILED;
+      } else if (subQueryEvent.getState() == SubQueryState.KILLED) {
+        finalState = QueryState.QUERY_KILLED;
+      } else {
+        finalState = QueryState.QUERY_ERROR;
       }
-
-      return query.checkQueryForCompleted();
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
+      query.setFinishTime();
+      return finalState;
     }
 
-    private QueryState finalizeQuery(Query query, SubQueryCompletedEvent event) {
+    private void finalizeQuery(Query query, QueryCompletedEvent event) {
       MasterPlan masterPlan = query.getPlan();
 
-      if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
-        ExecutionBlock terminal = query.getPlan().getTerminalBlock();
-        DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
-        Path finalOutputDir = commitOutputData(query);
+      ExecutionBlock terminal = query.getPlan().getTerminalBlock();
+      DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
+      Path finalOutputDir = commitOutputData(query);
 
-        QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
-        try {
-          hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
-              finalOutputDir);
-        } catch (Exception e) {
-          query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
-          return QueryState.QUERY_FAILED;
-        } finally {
-          query.setFinishTime();
-        }
-        query.finished(QueryState.QUERY_SUCCEEDED);
-        query.eventHandler.handle(new QueryFinishEvent(query.getId()));
-      }
-
-      return QueryState.QUERY_SUCCEEDED;
-    }
-
-    @Override
-    public QueryState transition(Query query, QueryEvent event) {
-      // increase the count for completed subqueries
-      query.completedSubQueryCount++;
-
-      SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
-
-      // if the subquery is succeeded
-      if (castEvent.getFinalState() == SubQueryState.SUCCEEDED) {
-        if (hasNext(query)) { // if there is next block
-          return executeNextBlock(query);
-        } else {
-          return finalizeQuery(query, castEvent);
-        }
-      } else {
-        query.setFinishTime();
-
-        if (castEvent.getFinalState() == SubQueryState.ERROR) {
-          return QueryState.QUERY_ERROR;
-        } else {
-          return QueryState.QUERY_FAILED;
-        }
+      QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
+      try {
+        hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(),
+            finalOutputDir);
+      } catch (Exception e) {
+        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
       }
     }
 
@@ -541,6 +543,64 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
+  public static class SubQueryCompletedTransition implements SingleArcTransition<Query, QueryEvent> {
+
+    private boolean hasNext(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.peek();
+      return !query.getPlan().isTerminal(nextBlock);
+    }
+
+    private void executeNextBlock(Query query) {
+      ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
+      ExecutionBlock nextBlock = cursor.nextBlock();
+      SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
+      nextSubQuery.setPriority(query.priority--);
+      query.addSubQuery(nextSubQuery);
+      nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
+
+      LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+        LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+      }
+    }
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      try {
+        query.completedSubQueryCount++;
+        SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event;
+
+        if (castEvent.getState() == SubQueryState.SUCCEEDED) {
+          query.successedSubQueryCount++;
+        } else if (castEvent.getState() == SubQueryState.KILLED) {
+          query.killedSubQueryCount++;
+        } else if (castEvent.getState() == SubQueryState.FAILED) {
+          query.failedSubQueryCount++;
+        } else if (castEvent.getState() == SubQueryState.ERROR) {
+          query.erroredSubQueryCount++;
+        } else {
+          LOG.error(String.format("Invalid SubQuery (%s) State %s at %s",
+              castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getState().name()));
+          query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+        }
+
+        // if a subquery is succeeded and a query is running
+        if (castEvent.getState() == SubQueryState.SUCCEEDED &&  // latest subquery succeeded
+            query.getState() == QueryState.QUERY_RUNNING &&     // current state is not in KILL_WAIT, FAILED, or ERROR.
+            hasNext(query)) {                                   // there remains at least one subquery.
+          executeNextBlock(query);
+        } else { // if a query is completed due to finished, kill, failure, or error
+          query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState()));
+        }
+      } catch (Throwable t) {
+        LOG.error(t);
+        query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR));
+      }
+    }
+  }
+
   private static class DiagnosticsUpdateTransition implements SingleArcTransition<Query, QueryEvent> {
     @Override
     public void transition(Query query, QueryEvent event) {
@@ -548,32 +608,34 @@ public class Query implements EventHandler<QueryEvent> {
     }
   }
 
-  private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
-
+  private static class KillNewQueryTransition implements SingleArcTransition<Query, QueryEvent> {
     @Override
     public void transition(Query query, QueryEvent event) {
       query.setFinishTime();
-      query.finished(QueryState.QUERY_ERROR);
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
     }
   }
 
-  public QueryState finished(QueryState finalState) {
-    setFinishTime();
-    return finalState;
+  private static class KillSubQueriesTransition implements SingleArcTransition<Query, QueryEvent> {
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      synchronized (query.subqueries) {
+        for (SubQuery subquery : query.subqueries.values()) {
+          query.eventHandler.handle(new SubQueryEvent(subquery.getId(), SubQueryEventType.SQ_KILL));
+        }
+      }
+    }
   }
 
-  /**
-   * Check if all subqueries of the query are completed
-   * @return QueryState.QUERY_SUCCEEDED if all subqueries are completed.
-   */
-  QueryState checkQueryForCompleted() {
-    if (completedSubQueryCount == subqueries.size()) {
-      return QueryState.QUERY_SUCCEEDED;
+  private static class InternalErrorTransition implements SingleArcTransition<Query, QueryEvent> {
+
+    @Override
+    public void transition(Query query, QueryEvent event) {
+      query.setFinishTime();
+      query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
     }
-    return getState();
   }
 
-
   @Override
   public void handle(QueryEvent event) {
     LOG.info("Processing " + event.getQueryId() + " of type " + event.getType());
@@ -589,8 +651,7 @@ public class Query implements EventHandler<QueryEvent> {
 
       //notify the eventhandler of state change
       if (oldState != getState()) {
-        LOG.info(id + " Query Transitioned from " + oldState + " to "
-            + getState());
+        LOG.info(id + " Query Transitioned from " + oldState + " to " + getState());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index 2a93d3c..6dc437f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -91,6 +91,10 @@ public class QueryInProgress extends CompositeService {
     super.init(conf);
   }
 
+  public void kill() {
+    queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get());
+  }
+
   @Override
   public void stop() {
     if(stopped.getAndSet(true)) {
@@ -172,6 +176,8 @@ public class QueryInProgress extends CompositeService {
         submmitQueryToMaster();
       } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) {
         stop();
+      } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) {
+        kill();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
index b2c129f..811de1b 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java
@@ -38,6 +38,7 @@ public class QueryJobEvent extends AbstractEvent<QueryJobEvent.Type> {
     QUERY_JOB_HEARTBEAT,
     QUERY_JOB_FINISH,
     QUERY_MASTER_START,
-    QUERY_MASTER_STOP
+    QUERY_MASTER_STOP,
+    QUERY_JOB_KILL
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
index 5ce57f7..3c30e38 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java
@@ -26,10 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.*;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.QueryMasterProtocol;
@@ -128,13 +125,12 @@ public class QueryMasterManagerService extends CompositeService
     try {
       ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
       QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
-      ContainerId cid =
-          queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
 
       if(queryMasterTask.isStopped()) {
-        LOG.debug("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
         done.run(LazyTaskScheduler.stopTaskRunnerReq);
       } else {
+        ContainerId cid =
+            queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
         LOG.debug("getTask:" + cid + ", ebId:" + ebId);
         queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
       }
@@ -147,10 +143,26 @@ public class QueryMasterManagerService extends CompositeService
   public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
                            RpcCallback<PrimitiveProtos.BoolProto> done) {
     try {
-      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
-          new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
-      queryMasterTask.getEventHandler().handle(
-          new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+      QueryId queryId = new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId());
+      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(request.getId());
+      LOG.info("statusUpdate from " + attemptId);
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+      if (queryMasterTask == null) {
+        queryMasterTask = queryMaster.getQueryMasterTask(queryId, true);
+      }
+      SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId());
+      QueryUnit task = sq.getQueryUnit(attemptId.getQueryUnitId());
+      QueryUnitAttempt attempt = task.getAttempt(attemptId.getId());
+      LOG.info(String.format("Task State: %s, Attempt State: %s", task.getState().name(), attempt.getState().name()));
+      if (request.getState() == TajoProtos.TaskAttemptState.TA_KILLED) {
+        LOG.info(attemptId + " Killed");
+        attempt.handle(
+            new TaskAttemptEvent(new QueryUnitAttemptId(request.getId()), TaskAttemptEventType.TA_LOCAL_KILLED));
+      } else {
+        LOG.info(attemptId + " updated");
+        queryMasterTask.getEventHandler().handle(
+            new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+      }
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
@@ -194,6 +206,14 @@ public class QueryMasterManagerService extends CompositeService
   }
 
   @Override
+  public void killQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
+                        RpcCallback<PrimitiveProtos.BoolProto> done) {
+    QueryId queryId = new QueryId(request);
+    QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(queryId);
+    queryMasterTask.getQuery().handle(new QueryEvent(queryId, QueryEventType.KILL));
+  }
+
+  @Override
   public void executeQuery(RpcController controller,
                            TajoWorkerProtocol.QueryExecutionRequestProto request,
                            RpcCallback<PrimitiveProtos.BoolProto> done) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index eb0528c..2c3ddfe 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -45,6 +45,7 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.GlobalEngine;
 import org.apache.tajo.master.TajoAsyncDispatcher;
+import org.apache.tajo.master.TajoContainerProxy;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.rpc.CallFuture;
@@ -64,6 +65,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.tajo.TajoProtos.QueryState;
+
 public class QueryMasterTask extends CompositeService {
   private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
 
@@ -135,8 +138,9 @@ public class QueryMasterTask extends CompositeService {
       dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher());
       dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
       dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher());
-      dispatcher.register(QueryFinishEvent.EventType.class, new QueryFinishEventHandler());
+      dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler());
       dispatcher.register(TaskSchedulerEvent.EventType.class, new TaskSchedulerDispatcher());
+      dispatcher.register(LocalTaskEventType.class, new LocalTaskEventHandler());
 
       initStagingDir();
 
@@ -247,12 +251,38 @@ public class QueryMasterTask extends CompositeService {
     }
   }
 
-  private static class QueryFinishEventHandler implements EventHandler<QueryFinishEvent> {
+  private class LocalTaskEventHandler implements EventHandler<LocalTaskEvent> {
+    @Override
+    public void handle(LocalTaskEvent event) {
+      TajoContainerProxy proxy = (TajoContainerProxy) resourceAllocator.getContainers().get(event.getContainerId());
+      proxy.killTaskAttempt(event.getTaskAttemptId());
+    }
+  }
+
+  private class QueryFinishEventHandler implements EventHandler<QueryMasterQueryCompletedEvent> {
     @Override
-    public void handle(QueryFinishEvent event) {
+    public void handle(QueryMasterQueryCompletedEvent event) {
       QueryId queryId = event.getQueryId();
-      LOG.info("Query end notification started for QueryId : " + queryId);
-      //QueryMaster must be lived until client fetching all query result data.
+      LOG.info("Query completion notified from " + queryId);
+
+      while (!isTerminatedState(query.getState())) {
+        try {
+          synchronized (this) {
+            wait(10);
+          }
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+      LOG.info("Query final state: " + query.getState());
+      queryMasterContext.stopQuery(queryId);
+    }
+
+    private boolean isTerminatedState(QueryState state) {
+      return
+          state == QueryState.QUERY_SUCCEEDED ||
+          state == QueryState.QUERY_FAILED ||
+          state == QueryState.QUERY_KILLED;
     }
   }
 
@@ -307,7 +337,6 @@ public class QueryMasterTask extends CompositeService {
 
       MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
       queryMasterContext.getGlobalPlanner().build(masterPlan);
-      //this.masterPlan = queryMasterContext.getGlobalOptimizer().optimize(masterPlan);
 
       query = new Query(queryTaskContext, queryId, querySubmitTime,
           "", queryTaskContext.getEventHandler(), masterPlan);
@@ -437,9 +466,9 @@ public class QueryMasterTask extends CompositeService {
     return queryId;
   }
 
-  public TajoProtos.QueryState getState() {
+  public QueryState getState() {
     if(query == null) {
-      return TajoProtos.QueryState.QUERY_NOT_ASSIGNED;
+      return QueryState.QUERY_NOT_ASSIGNED;
     } else {
       return query.getState();
     }
@@ -513,5 +542,4 @@ public class QueryMasterTask extends CompositeService {
       return queryMetrics;
     }
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index a0f3dfd..2e4bd70 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -87,30 +87,73 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 
   private List<DataLocation> dataLocations = Lists.newArrayList();
 
+  private static final AttemptKilledTransition ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
+
   protected static final StateMachineFactory
       <QueryUnit, TaskState, TaskEventType, TaskEvent> stateMachineFactory =
-      new StateMachineFactory
-          <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
-
-      .addTransition(TaskState.NEW, TaskState.SCHEDULED,
-          TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
-
-       .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
-           TaskEventType.T_ATTEMPT_LAUNCHED, new AttemptLaunchedTransition())
-
-        .addTransition(TaskState.RUNNING, TaskState.RUNNING,
-           TaskEventType.T_ATTEMPT_LAUNCHED)
-
-       .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
-           TaskEventType.T_ATTEMPT_SUCCEEDED, new AttemptSucceededTransition())
+      new StateMachineFactory <QueryUnit, TaskState, TaskEventType, TaskEvent>(TaskState.NEW)
+
+          // Transitions from NEW state
+          .addTransition(TaskState.NEW, TaskState.SCHEDULED,
+              TaskEventType.T_SCHEDULE,
+              new InitialScheduleTransition())
+          .addTransition(TaskState.NEW, TaskState.KILLED,
+              TaskEventType.T_KILL,
+              new KillNewTaskTransition())
+
+          // Transitions from SCHEDULED state
+          .addTransition(TaskState.SCHEDULED, TaskState.RUNNING,
+              TaskEventType.T_ATTEMPT_LAUNCHED,
+              new AttemptLaunchedTransition())
+          .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT,
+              TaskEventType.T_KILL,
+              new KillTaskTransition())
+
+          // Transitions from RUNNING state
+          .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+              TaskEventType.T_ATTEMPT_LAUNCHED)
+          .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED,
+              TaskEventType.T_ATTEMPT_SUCCEEDED,
+              new AttemptSucceededTransition())
+          .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT,
+              TaskEventType.T_KILL,
+              new KillTaskTransition())
+          .addTransition(TaskState.RUNNING,
+              EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
+              TaskEventType.T_ATTEMPT_FAILED,
+              new AttemptFailedOrRetryTransition())
+
+          // Transitions from KILL_WAIT state
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_KILLED,
+              ATTEMPT_KILLED_TRANSITION)
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+              TaskEventType.T_ATTEMPT_LAUNCHED,
+              new KillTaskTransition())
+          .addTransition(TaskState.KILL_WAIT, TaskState.FAILED,
+              TaskEventType.T_ATTEMPT_FAILED,
+              new AttemptFailedTransition())
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILLED,
+              TaskEventType.T_ATTEMPT_SUCCEEDED,
+              ATTEMPT_KILLED_TRANSITION)
+              // Ignore-able transitions.
+          .addTransition(TaskState.KILL_WAIT, TaskState.KILL_WAIT,
+              EnumSet.of(
+                  TaskEventType.T_KILL,
+                  TaskEventType.T_SCHEDULE))
+
+          // Transitions from SUCCEEDED state
+          // Ignore-able transitions
+          .addTransition(TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+              EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
+
+          // Transitions from FAILED state
+          // Ignore-able transitions
+          .addTransition(TaskState.FAILED, TaskState.FAILED,
+              EnumSet.of(TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_KILLED, TaskEventType.T_ATTEMPT_SUCCEEDED))
+
+          .installTopology();
 
-       .addTransition(TaskState.RUNNING,
-            EnumSet.of(TaskState.RUNNING, TaskState.FAILED),
-            TaskEventType.T_ATTEMPT_FAILED, new AttemptFailedTransition())
-
-
-
-      .installTopology();
   private final StateMachine<TaskState, TaskEventType, TaskEvent> stateMachine;
 
 
@@ -406,6 +449,36 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     }
   }
 
+  private void finishTask() {
+    this.finishTime = System.currentTimeMillis();
+  }
+
+  private static class KillNewTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent taskEvent) {
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+    }
+  }
+
+  private static class KillTaskTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent taskEvent) {
+      task.finishTask();
+      task.eventHandler.handle(new TaskAttemptEvent(task.lastAttemptId, TaskAttemptEventType.TA_KILL));
+    }
+  }
+
+  private static class AttemptKilledTransition implements SingleArcTransition<QueryUnit, TaskEvent>{
+
+    @Override
+    public void transition(QueryUnit task, TaskEvent event) {
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.KILLED));
+    }
+  }
+
   private static class AttemptSucceededTransition
       implements SingleArcTransition<QueryUnit, TaskEvent>{
 
@@ -413,15 +486,14 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     public void transition(QueryUnit task,
                            TaskEvent event) {
       TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      QueryUnitAttempt attempt = task.attempts.get(
-          attemptEvent.getTaskAttemptId());
+      QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
 
       task.successfulAttempt = attemptEvent.getTaskAttemptId();
       task.succeededHost = attempt.getHost();
-      task.finishTime = System.currentTimeMillis();
       task.succeededPullServerPort = attempt.getPullServerPort();
-      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(),
-          SubQueryEventType.SQ_TASK_COMPLETED));
+
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(event.getTaskId(), TaskState.SUCCEEDED));
     }
   }
 
@@ -430,14 +502,28 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     public void transition(QueryUnit task,
                            TaskEvent event) {
       TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
-      QueryUnitAttempt attempt = task.attempts.get(
-          attemptEvent.getTaskAttemptId());
+      QueryUnitAttempt attempt = task.attempts.get(attemptEvent.getTaskAttemptId());
       task.launchTime = System.currentTimeMillis();
       task.succeededHost = attempt.getHost();
     }
   }
 
-  private static class AttemptFailedTransition implements
+  private static class AttemptFailedTransition implements SingleArcTransition<QueryUnit, TaskEvent> {
+    @Override
+    public void transition(QueryUnit task, TaskEvent event) {
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      LOG.info("=============================================================");
+      LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<");
+      LOG.info("=============================================================");
+      task.failedAttempts++;
+      task.finishedAttempts++;
+
+      task.finishTask();
+      task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
+    }
+  }
+
+  private static class AttemptFailedOrRetryTransition implements
     MultipleArcTransition<QueryUnit, TaskEvent, TaskState> {
 
     @Override
@@ -454,8 +540,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
           task.addAndScheduleAttempt();
         }
       } else {
-        task.eventHandler.handle(
-            new SubQueryTaskEvent(task.getId(), SubQueryEventType.SQ_FAILED));
+        task.finishTask();
+        task.eventHandler.handle(new SubQueryTaskEvent(task.getId(), TaskState.FAILED));
         return TaskState.FAILED;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index f5001cc..aac5e37 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -20,6 +20,7 @@ package org.apache.tajo.master.querymaster;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
 import org.apache.tajo.QueryUnitAttemptId;
@@ -51,6 +52,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
   private final QueryUnit queryUnit;
   final EventHandler eventHandler;
 
+  private ContainerId containerId;
   private String hostName;
   private int port;
   private int expire;
@@ -68,44 +70,87 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
       <QueryUnitAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
       (TaskAttemptState.TA_NEW)
 
+      // Transitions from TA_NEW state
       .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
           TaskAttemptEventType.TA_SCHEDULE, new TaskAttemptScheduleTransition())
       .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_UNASSIGNED,
           TaskAttemptEventType.TA_RESCHEDULE, new TaskAttemptScheduleTransition())
+      .addTransition(TaskAttemptState.TA_NEW, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_KILL,
+          new TaskKilledCompleteTransition())
 
+      // Transitions from TA_UNASSIGNED state
       .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_ASSIGNED,
-          TaskAttemptEventType.TA_ASSIGNED, new LaunchTransition())
+          TaskAttemptEventType.TA_ASSIGNED,
+          new LaunchTransition())
+      .addTransition(TaskAttemptState.TA_UNASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillUnassignedTaskTransition())
 
-      // from assigned
+      // Transitions from TA_ASSIGNED state
       .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_ASSIGNED,
           TaskAttemptEventType.TA_ASSIGNED, new AlreadyAssignedTransition())
+      .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
       .addTransition(TaskAttemptState.TA_ASSIGNED,
           EnumSet.of(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILLED),
           TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
       .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_SUCCEEDED,
           TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
       .addTransition(TaskAttemptState.TA_ASSIGNED, TaskAttemptState.TA_FAILED,
           TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
 
-      // from running
+      // Transitions from TA_RUNNING state
       .addTransition(TaskAttemptState.TA_RUNNING,
           EnumSet.of(TaskAttemptState.TA_RUNNING),
           TaskAttemptEventType.TA_UPDATE, new StatusUpdateTransition())
-
+      .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_KILL,
+          new KillTaskTransition())
       .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_SUCCEEDED,
           TaskAttemptEventType.TA_DONE, new SucceededTransition())
-
       .addTransition(TaskAttemptState.TA_RUNNING, TaskAttemptState.TA_FAILED,
           TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
 
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_LOCAL_KILLED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          TaskAttemptEventType.TA_ASSIGNED,
+          new KillTaskTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_SCHEDULE_CANCELED,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DONE,
+          new TaskKilledCompleteTransition())
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_FAILED,
+          TaskAttemptEventType.TA_FATAL_ERROR)
+      .addTransition(TaskAttemptState.TA_KILL_WAIT, TaskAttemptState.TA_KILL_WAIT,
+          EnumSet.of(
+              TaskAttemptEventType.TA_KILL,
+              TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+              TaskAttemptEventType.TA_UPDATE))
+
+      // Transitions from TA_SUCCEEDED state
       .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
           TaskAttemptEventType.TA_UPDATE)
       .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
           TaskAttemptEventType.TA_DONE, new AlreadyDoneTransition())
       .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_FAILED,
           TaskAttemptEventType.TA_FATAL_ERROR, new FailedTransition())
+       // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_SUCCEEDED, TaskAttemptState.TA_SUCCEEDED,
+          TaskAttemptEventType.TA_KILL)
+
+      // Transitions from TA_KILLED state
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE)
+      // Ignore-able transitions
+      .addTransition(TaskAttemptState.TA_KILLED, TaskAttemptState.TA_KILLED,
+          EnumSet.of(
+              TaskAttemptEventType.TA_UPDATE))
 
       .installTopology();
 
@@ -158,6 +203,10 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     return this.port;
   }
 
+  public void setContainerId(ContainerId containerId) {
+    this.containerId = containerId;
+  }
+
   public void setHost(String host) {
     this.hostName = host;
   }
@@ -204,17 +253,27 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
   }
 
   private static class TaskAttemptScheduleTransition implements
-    SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+      SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
 
     @Override
-    public void transition(QueryUnitAttempt taskAttempt,
-                           TaskAttemptEvent taskAttemptEvent) {
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
       taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
           EventType.T_SCHEDULE, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
           taskAttempt.scheduleContext, taskAttempt));
     }
   }
 
+  private static class KillUnassignedTaskTransition implements
+      SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent taskAttemptEvent) {
+      taskAttempt.eventHandler.handle(new QueryUnitAttemptScheduleEvent(
+          EventType.T_SCHEDULE_CANCEL, taskAttempt.getQueryUnit().getId().getExecutionBlockId(),
+          taskAttempt.scheduleContext, taskAttempt));
+    }
+  }
+
   private static class LaunchTransition
       implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
 
@@ -222,6 +281,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     public void transition(QueryUnitAttempt taskAttempt,
                            TaskAttemptEvent event) {
       TaskAttemptAssignedEvent castEvent = (TaskAttemptAssignedEvent) event;
+      taskAttempt.containerId = castEvent.getContainerId();
       taskAttempt.setHost(castEvent.getHostName());
       taskAttempt.setPullServerPort(castEvent.getPullServerPort());
       taskAttempt.eventHandler.handle(
@@ -230,20 +290,29 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     }
   }
 
+  private static class TaskKilledCompleteTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt,
+                           TaskAttemptEvent event) {
+      taskAttempt.getQueryUnit().handle(new TaskEvent(taskAttempt.getId().getQueryUnitId(),
+          TaskEventType.T_ATTEMPT_KILLED));
+      LOG.info(taskAttempt.getId() + " Received TA_KILLED Status from LocalTask");
+    }
+  }
+
   private static class StatusUpdateTransition
       implements MultipleArcTransition<QueryUnitAttempt, TaskAttemptEvent, TaskAttemptState> {
 
     @Override
     public TaskAttemptState transition(QueryUnitAttempt taskAttempt,
                                        TaskAttemptEvent event) {
-      TaskAttemptStatusUpdateEvent updateEvent =
-          (TaskAttemptStatusUpdateEvent) event;
+      TaskAttemptStatusUpdateEvent updateEvent = (TaskAttemptStatusUpdateEvent) event;
 
       switch (updateEvent.getStatus().getState()) {
         case TA_PENDING:
         case TA_RUNNING:
           return TaskAttemptState.TA_RUNNING;
-
         default:
           return taskAttempt.getState();
       }
@@ -289,6 +358,15 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
     }
   }
 
+  private static class KillTaskTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent> {
+
+    @Override
+    public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
+      taskAttempt.eventHandler.handle(new LocalTaskEvent(taskAttempt.getId(), taskAttempt.containerId,
+          LocalTaskEventType.KILL));
+    }
+  }
+
   private static class FailedTransition implements SingleArcTransition<QueryUnitAttempt, TaskAttemptEvent>{
     @Override
     public void transition(QueryUnitAttempt taskAttempt, TaskAttemptEvent event) {
@@ -302,8 +380,7 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
   @Override
   public void handle(TaskAttemptEvent event) {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Processing " + event.getTaskAttemptId() + " of type "
-          + event.getType());
+      LOG.debug("Processing " + event.getTaskAttemptId() + " of type " + event.getType());
     }
     try {
       writeLock.lock();


Mime
View raw message