tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/2] TAJO-305: Implement killQuery feature. (hyunsik)
Date Mon, 17 Feb 2014 09:44:39 GMT
Repository: incubator-tajo
Updated Branches:
  refs/heads/master e2a7dffdb -> ae541ffae


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index bcdba43..7e1a9bd 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -20,6 +20,7 @@ package org.apache.tajo.master.querymaster;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -96,7 +97,11 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition();
   private static final InternalErrorTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition();
   private static final ContainerLaunchTransition CONTAINER_LAUNCH_TRANSITION = new ContainerLaunchTransition();
-  private static final FailedTransition FAILED_TRANSITION = new FailedTransition();
+  private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition();
+  private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION =
+      new AllocatedContainersCancelTransition();
+  private static final SubQueryCompleteTransition SUBQUERY_COMPLETED_TRANSITION =
+      new SubQueryCompleteTransition();
   private StateMachine<SubQueryState, SubQueryEventType, SubQueryEvent> stateMachine;
 
   protected static final StateMachineFactory<SubQuery, SubQueryState,
@@ -106,107 +111,142 @@ public class SubQuery implements EventHandler<SubQueryEvent>
{
 
           // Transitions from NEW state
           .addTransition(SubQueryState.NEW,
-              EnumSet.of(SubQueryState.INIT, SubQueryState.ERROR, SubQueryState.SUCCEEDED),
-              SubQueryEventType.SQ_INIT, new InitAndRequestContainer())
+              EnumSet.of(SubQueryState.INITED, SubQueryState.ERROR, SubQueryState.SUCCEEDED),
+              SubQueryEventType.SQ_INIT,
+              new InitAndRequestContainer())
           .addTransition(SubQueryState.NEW, SubQueryState.NEW,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          .addTransition(SubQueryState.NEW, SubQueryState.KILLED,
+              SubQueryEventType.SQ_KILL)
           .addTransition(SubQueryState.NEW, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-          // Transitions from INIT state
-          .addTransition(SubQueryState.INIT, SubQueryState.CONTAINER_ALLOCATED,
+          // Transitions from INITED state
+          .addTransition(SubQueryState.INITED, SubQueryState.RUNNING,
               SubQueryEventType.SQ_CONTAINER_ALLOCATED,
               CONTAINER_LAUNCH_TRANSITION)
-          .addTransition(SubQueryState.INIT, SubQueryState.INIT,
+          .addTransition(SubQueryState.INITED, SubQueryState.INITED,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(SubQueryState.NEW, SubQueryState.ERROR,
+          .addTransition(SubQueryState.INITED, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_KILL)
+          .addTransition(SubQueryState.INITED, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-          // Transitions from CONTAINER_ALLOCATED state
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED,
-              EnumSet.of(SubQueryState.RUNNING, SubQueryState.FAILED, SubQueryState.SUCCEEDED),
-              SubQueryEventType.SQ_START,
-              new StartTransition())
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
+          // Transitions from RUNNING state
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
               SubQueryEventType.SQ_CONTAINER_ALLOCATED,
               CONTAINER_LAUNCH_TRANSITION)
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.CONTAINER_ALLOCATED,
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_TASK_COMPLETED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.RUNNING,
+              EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED),
+              SubQueryEventType.SQ_SUBQUERY_COMPLETED,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SubQueryEventType.SQ_FAILED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(SubQueryState.CONTAINER_ALLOCATED, SubQueryState.ERROR,
+          .addTransition(SubQueryState.RUNNING, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_KILL,
+              new KillTasksTransition())
+          .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-
-          // Transitions from RUNNING state
-          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
-              CONTAINER_LAUNCH_TRANSITION)
+          // Ignore-able Transition
           .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
               SubQueryEventType.SQ_START)
-          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+
+          // Transitions from KILL_WAIT state
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
+              EnumSet.of(SubQueryEventType.SQ_KILL))
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
               SubQueryEventType.SQ_TASK_COMPLETED,
-              new TaskCompletedTransition())
-          .addTransition(SubQueryState.RUNNING, SubQueryState.SUCCEEDED,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT,
+              EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED, SubQueryState.KILLED),
               SubQueryEventType.SQ_SUBQUERY_COMPLETED,
-              new SubQueryCompleteTransition())
-          .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING,
+              SUBQUERY_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(SubQueryState.RUNNING, SubQueryState.FAILED,
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT,
               SubQueryEventType.SQ_FAILED,
-              FAILED_TRANSITION)
-          .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR,
+              TASK_COMPLETED_TRANSITION)
+          .addTransition(SubQueryState.KILL_WAIT, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
 
-          // Transitions from SUCCEEDED state
+              // Transitions from SUCCEEDED state
           .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
-              SubQueryEventType.SQ_START)
-          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED)
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
           .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
           .addTransition(SubQueryState.SUCCEEDED, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+              // Ignore-able events
+          .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED,
+              EnumSet.of(
+                  SubQueryEventType.SQ_START,
+                  SubQueryEventType.SQ_KILL,
+                  SubQueryEventType.SQ_CONTAINER_ALLOCATED))
 
           // Transitions from FAILED state
           .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-              SubQueryEventType.SQ_START)
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-              SubQueryEventType.SQ_CONTAINER_ALLOCATED)
-          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
-              SubQueryEventType.SQ_FAILED)
           .addTransition(SubQueryState.FAILED, SubQueryState.ERROR,
               SubQueryEventType.SQ_INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
+          // Ignore-able transitions
+          .addTransition(SubQueryState.FAILED, SubQueryState.FAILED,
+              EnumSet.of(
+                  SubQueryEventType.SQ_START,
+                  SubQueryEventType.SQ_KILL,
+                  SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+                  SubQueryEventType.SQ_FAILED))
 
           // Transitions from FAILED state
           .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
+              SubQueryEventType.SQ_CONTAINER_ALLOCATED,
+              CONTAINERS_CANCEL_TRANSITION)
+          .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
               SubQueryEventType.SQ_DIAGNOSTIC_UPDATE,
               DIAGNOSTIC_UPDATE_TRANSITION)
+          // Ignore-able transitions
           .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
-              SubQueryEventType.SQ_FAILED)
-          .addTransition(SubQueryState.ERROR, SubQueryState.ERROR,
-              SubQueryEventType.SQ_INTERNAL_ERROR)
+              EnumSet.of(
+                  SubQueryEventType.SQ_START,
+                  SubQueryEventType.SQ_KILL,
+                  SubQueryEventType.SQ_FAILED,
+                  SubQueryEventType.SQ_INTERNAL_ERROR))
 
           .installTopology();
 
-
   private final Lock readLock;
   private final Lock writeLock;
 
   private int totalScheduledObjectsCount;
-  private int completedObjectCount = 0;
+  private int succeededObjectCount = 0;
   private int completedTaskCount = 0;
+  private int succeededTaskCount = 0;
+  private int killedObjectCount = 0;
+  private int failedObjectCount = 0;
   private TaskSchedulerContext schedulerContext;
 
   public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan,
ExecutionBlock block, AbstractStorageManager sm) {
@@ -223,8 +263,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   }
 
   public static boolean isRunningState(SubQueryState state) {
-    return state == SubQueryState.INIT || state == SubQueryState.NEW ||
-        state == SubQueryState.CONTAINER_ALLOCATED || state == SubQueryState.RUNNING;
+    return state == SubQueryState.INITED || state == SubQueryState.NEW || state == SubQueryState.RUNNING;
   }
 
   public QueryMasterTask.QueryMasterTaskContext getContext() {
@@ -271,15 +310,15 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       if (getState() == SubQueryState.NEW) {
         return 0;
       } else {
-        return (float)(completedObjectCount) / (float)totalScheduledObjectsCount;
+        return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount;
       }
     } finally {
       readLock.unlock();
     }
   }
 
-  public int getCompletedObjectCount() {
-    return completedObjectCount;
+  public int getSucceededObjectCount() {
+    return succeededObjectCount;
   }
 
   public int getTotalScheduledObjectsCount() {
@@ -294,15 +333,29 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     tasks.put(task.getId(), task);
   }
 
-  public void abortSubQuery(SubQueryState finalState) {
+  /**
+   * It finalizes this subquery. It is only invoked when the subquery is succeeded.
+   */
+  public void complete() {
+    cleanup();
+    finalizeStats();
+    setFinishTime();
+    eventHandler.handle(new SubQueryCompletedEvent(getId(), SubQueryState.SUCCEEDED));
+  }
+
+  /**
+   * It finalizes this subquery. Unlike {@link SubQuery#complete()},
+   * it is invoked when a subquery is abnormally finished.
+   *
+   * @param finalState The final subquery state
+   */
+  public void abort(SubQueryState finalState) {
     // TODO -
     // - committer.abortSubQuery(...)
     // - record SubQuery Finish Time
     // - CleanUp Tasks
     // - Record History
-
-    stopScheduler();
-    releaseContainers();
+    cleanup();
     setFinishTime();
     eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState));
   }
@@ -394,7 +447,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     }
   }
 
-  public static TableStats computeStatFromUnionBlock(SubQuery subQuery) {
+  private static TableStats computeStatFromUnionBlock(SubQuery subQuery) {
     TableStats stat = new TableStats();
     TableStats childStat;
     long avgRows = 0, numBytes = 0, numRows = 0;
@@ -443,10 +496,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
   private void releaseContainers() {
     // If there are still live TaskRunners, try to kill the containers.
-    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP ,getId(),
containers.values()));
+    eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(),
containers.values()));
   }
 
-  private void finish() {
+  /**
+   * It computes all stats and sets the intermediate result.
+   */
+  private void finalizeStats() {
     TableStats stats;
     if (block.hasUnion()) {
       stats = computeStatFromUnionBlock(this);
@@ -466,9 +522,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     schema = channel.getSchema();
     meta = CatalogUtil.newTableMeta(storeType, new Options());
     statistics = stats;
-    setFinishTime();
-
-    eventHandler.handle(new SubQuerySucceeEvent(getId()));
   }
 
   @Override
@@ -516,7 +569,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
       try {
         // Union operator does not require actual query processing. It is performed logically.
         if (execBlock.hasUnion()) {
-          subQuery.finish();
+          subQuery.finalizeStats();
           state = SubQueryState.SUCCEEDED;
         } else {
           ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock());
@@ -529,12 +582,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
 
           if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there
is no tasks
             subQuery.stopScheduler();
-            subQuery.finish();
+            subQuery.finalizeStats();
+            subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED));
             return SubQueryState.SUCCEEDED;
           } else {
             subQuery.taskScheduler.start();
             allocateContainers(subQuery);
-            return SubQueryState.INIT;
+            return SubQueryState.INITED;
           }
         }
       } catch (Exception e) {
@@ -826,7 +880,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return unit;
   }
 
-  int i = 0;
   private static class ContainerLaunchTransition
       implements SingleArcTransition<SubQuery, SubQueryEvent> {
 
@@ -838,42 +891,51 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
         for (Container container : allocationEvent.getAllocatedContainer()) {
           ContainerId cId = container.getId();
           if (subQuery.containers.containsKey(cId)) {
-            LOG.info(">>>>>>>>>>>> Duplicate Container!
<<<<<<<<<<<");
+            subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+                "Duplicated containers are allocated: " + cId.toString()));
+            subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
           }
           subQuery.containers.put(cId, container);
-          // TODO - This is debugging message. Should be removed
-          subQuery.i++;
         }
-        LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.i + " containers!");
+        LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size()
+ " containers!");
         subQuery.eventHandler.handle(
             new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH,
                 subQuery.getId(), allocationEvent.getAllocatedContainer()));
 
         subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START));
       } catch (Throwable t) {
-
+        subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+            ExceptionUtils.getStackTrace(t)));
+        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
       }
     }
   }
 
-  private static class StartTransition implements
-      MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
-
+  /**
+   * It is used in KILL_WAIT state against Contained Allocated event.
+   * It just returns allocated containers to resource manager.
+   */
+  private static class AllocatedContainersCancelTransition implements SingleArcTransition<SubQuery,
SubQueryEvent> {
     @Override
-    public SubQueryState transition(SubQuery subQuery,
-                           SubQueryEvent subQueryEvent) {
-      // schedule tasks
+    public void transition(SubQuery subQuery, SubQueryEvent event) {
       try {
-        return  SubQueryState.RUNNING;
-      } catch (Exception e) {
-        LOG.warn("SubQuery (" + subQuery.getId() + ") failed", e);
-        return SubQueryState.FAILED;
+        SubQueryContainerAllocationEvent allocationEvent =
+            (SubQueryContainerAllocationEvent) event;
+        subQuery.eventHandler.handle(
+            new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP,
+                subQuery.getId(), allocationEvent.getAllocatedContainer()));
+        LOG.info(String.format("[%s] %d allocated containers are canceled",
+            subQuery.getId().toString(),
+            allocationEvent.getAllocatedContainer().size()));
+      } catch (Throwable t) {
+        subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(),
+            ExceptionUtils.getStackTrace(t)));
+        subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR));
       }
     }
   }
 
-  private static class TaskCompletedTransition
-      implements SingleArcTransition<SubQuery, SubQueryEvent> {
+  private static class TaskCompletedTransition implements SingleArcTransition<SubQuery,
SubQueryEvent> {
 
     @Override
     public void transition(SubQuery subQuery,
@@ -882,39 +944,101 @@ public class SubQuery implements EventHandler<SubQueryEvent>
{
       QueryUnit task = subQuery.getQueryUnit(taskEvent.getTaskId());
 
       if (task == null) { // task failed
+        LOG.error(String.format("Task %s is absent", taskEvent.getTaskId()));
         subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_FAILED));
       } else {
-        QueryUnitAttempt taskAttempt = task.getSuccessfulAttempt();
-        if (task.isLeafTask()) {
-          subQuery.completedObjectCount += task.getTotalFragmentNum();
-        } else {
-          subQuery.completedObjectCount++;
-        }
         subQuery.completedTaskCount++;
 
-        LOG.info(subQuery.getId() + " SubQuery Succeeded " + subQuery.completedTaskCount
+ "/"
-            + subQuery.schedulerContext.getEstimatedTaskNum() + " on " + taskAttempt.getHost()
+ ":" + taskAttempt.getPort());
-        if (subQuery.taskScheduler.remainingScheduledObjectNum() == 0
-            && subQuery.totalScheduledObjectsCount == subQuery.completedObjectCount)
{
-          subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(),
-              SubQueryEventType.SQ_SUBQUERY_COMPLETED));
+        if (taskEvent.getState() == TaskState.SUCCEEDED) {
+          if (task.isLeafTask()) {
+            subQuery.succeededObjectCount += task.getTotalFragmentNum();
+          } else {
+            subQuery.succeededObjectCount++;
+          }
+        } else if (task.getState() == TaskState.KILLED) {
+          if (task.isLeafTask()) {
+            subQuery.killedObjectCount += task.getTotalFragmentNum();
+          } else {
+            subQuery.killedObjectCount++;
+          }
+        } else if (task.getState() == TaskState.FAILED) {
+          if (task.isLeafTask()) {
+            subQuery.failedObjectCount+= task.getTotalFragmentNum();
+          } else {
+            subQuery.failedObjectCount++;
+          }
+
+          // if at least one task is failed, try to kill all tasks.
+          subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL));
+        }
+
+        LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed:
%d, Failed: %d",
+            subQuery.getId(),
+            subQuery.getTotalScheduledObjectsCount(),
+            subQuery.succeededObjectCount,
+            subQuery.killedObjectCount,
+            subQuery.failedObjectCount));
+
+        if (subQuery.totalScheduledObjectsCount ==
+            subQuery.succeededObjectCount + subQuery.killedObjectCount + subQuery.failedObjectCount)
{
+          subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_SUBQUERY_COMPLETED));
         }
       }
     }
   }
 
-  private static class SubQueryCompleteTransition
-      implements SingleArcTransition<SubQuery, SubQueryEvent> {
+  private static class KillTasksTransition implements SingleArcTransition<SubQuery, SubQueryEvent>
{
 
     @Override
     public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
+      subQuery.getTaskScheduler().stop();
+      for (QueryUnit queryUnit : subQuery.getQueryUnits()) {
+        subQuery.eventHandler.handle(new TaskEvent(queryUnit.getId(), TaskEventType.T_KILL));
+      }
+    }
+  }
+
+  private void cleanup() {
+    stopScheduler();
+    releaseContainers();
+  }
+
+  private static class SubQueryCompleteTransition
+      implements MultipleArcTransition<SubQuery, SubQueryEvent, SubQueryState> {
+
+    @Override
+    public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
       // TODO - Commit subQuery & do cleanup
       // TODO - records succeeded, failed, killed completed task
       // TODO - records metrics
-      LOG.info("SubQuery finished:" + subQuery.getId());
-      subQuery.stopScheduler();
-      subQuery.releaseContainers();
-      subQuery.finish();
+      try {
+        LOG.info(String.format("subQuery completed - %s (total=%d, success=%d, killed=%d)",
+            subQuery.getId().toString(),
+            subQuery.getTotalScheduledObjectsCount(),
+            subQuery.getSucceededObjectCount(),
+            subQuery.killedObjectCount));
+
+        if (subQuery.killedObjectCount > 0 || subQuery.failedObjectCount > 0) {
+          if (subQuery.failedObjectCount > 0) {
+            subQuery.abort(SubQueryState.FAILED);
+            return SubQueryState.FAILED;
+          } else if (subQuery.killedObjectCount > 0) {
+            subQuery.abort(SubQueryState.KILLED);
+            return SubQueryState.KILLED;
+          } else {
+            LOG.error("Invalid State " + subQuery.getState() + " State");
+            subQuery.abort(SubQueryState.ERROR);
+            return SubQueryState.ERROR;
+          }
+        } else {
+          subQuery.complete();
+          return SubQueryState.SUCCEEDED;
+        }
+      } catch (Throwable t) {
+        LOG.error(t);
+        subQuery.abort(SubQueryState.ERROR);
+        return SubQueryState.ERROR;
+      }
     }
   }
 
@@ -928,14 +1052,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private static class InternalErrorTransition implements SingleArcTransition<SubQuery,
SubQueryEvent> {
     @Override
     public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
-      subQuery.abortSubQuery(SubQueryState.ERROR);
-    }
-  }
-
-  private static class FailedTransition implements SingleArcTransition<SubQuery, SubQueryEvent>
{
-    @Override
-    public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) {
-      subQuery.abortSubQuery(SubQueryState.FAILED);
+      subQuery.abort(SubQueryState.ERROR);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
index ce4d209..effcfde 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java
@@ -20,10 +20,11 @@ package org.apache.tajo.master.querymaster;
 
 public enum SubQueryState {
   NEW,
-  CONTAINER_ALLOCATED,
-  INIT,
+  INITED,
   RUNNING,
   SUCCEEDED,
   FAILED,
+  KILL_WAIT,
+  KILLED,
   ERROR
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 16427b6..28386bb 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -21,10 +21,6 @@ package org.apache.tajo.worker;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -53,7 +49,6 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.util.ApplicationIdUtils;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -63,12 +58,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 public class TajoResourceAllocator extends AbstractResourceAllocator {
   private static final Log LOG = LogFactory.getLog(TajoResourceAllocator.class);
 
-  static AtomicInteger containerIdSeq = new AtomicInteger(0);
   private TajoConf tajoConf;
   private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
   private final ExecutorService executorService;
@@ -110,7 +103,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     tajoConf = (TajoConf)conf;
 
     queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
-//
+
     queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
 
     super.init(conf);
@@ -140,23 +133,6 @@ public class TajoResourceAllocator extends AbstractResourceAllocator
{
     super.start();
   }
 
-  final public static FsPermission QUERYCONF_FILE_PERMISSION =
-      FsPermission.createImmutable((short) 0644); // rw-r--r--
-
-  private static void writeConf(Configuration conf, Path queryConfFile)
-      throws IOException {
-    // Write job file to Tajo's fs
-    FileSystem fs = queryConfFile.getFileSystem(conf);
-    FSDataOutputStream out =
-        FileSystem.create(fs, queryConfFile,
-            new FsPermission(QUERYCONF_FILE_PERMISSION));
-    try {
-      conf.writeXml(out);
-    } finally {
-      out.close();
-    }
-  }
-
   class TajoTaskRunnerLauncher implements TaskRunnerLauncher {
     @Override
     public void handle(TaskRunnerGroupEvent event) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index da3dc34..a73623f 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -130,7 +130,7 @@ public class TajoWorkerClientService extends AbstractService {
             RpcController controller,
             ClientProtos.GetQueryResultRequest request) throws ServiceException {
       QueryId queryId = new QueryId(request.getQueryId());
-      Query query = workerContext.getQueryMaster().getQuery(queryId);
+      Query query = workerContext.getQueryMaster().getQueryMasterTask(queryId, true).getQuery();
 
       ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
       try {
@@ -171,31 +171,35 @@ public class TajoWorkerClientService extends AbstractService {
         builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
       } else {
         QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId);
+
         builder.setResultCode(ClientProtos.ResultCode.OK);
         builder.setQueryMasterHost(bindAddr.getHostName());
         builder.setQueryMasterPort(bindAddr.getPort());
 
-        if (queryMasterTask != null) {
-          queryMasterTask.touchSessionTime();
-          Query query = queryMasterTask.getQuery();
-
-          builder.setState(query.getState());
-          builder.setProgress(query.getProgress());
-          builder.setSubmitTime(query.getAppSubmitTime());
-          builder.setHasResult(
-              !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
-                  queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
-          );
-          if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            builder.setFinishTime(query.getFinishTime());
-          } else {
-            builder.setFinishTime(System.currentTimeMillis());
-          }
-        } else {
+        if (queryMasterTask == null) {
+          queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true);
+        }
+        if (queryMasterTask == null) {
           builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED);
+          return builder.build();
         }
-      }
 
+        queryMasterTask.touchSessionTime();
+        Query query = queryMasterTask.getQuery();
+
+        builder.setState(query.getState());
+        builder.setProgress(query.getProgress());
+        builder.setSubmitTime(query.getAppSubmitTime());
+        builder.setHasResult(
+            !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
+                queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
+        );
+        if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+          builder.setFinishTime(query.getFinishTime());
+        } else {
+          builder.setFinishTime(System.currentTimeMillis());
+        }
+      }
       return builder.build();
     }
 
@@ -205,12 +209,6 @@ public class TajoWorkerClientService extends AbstractService {
             TajoIdProtos.QueryIdProto request) throws ServiceException {
       final QueryId queryId = new QueryId(request);
       LOG.info("Stop Query:" + queryId);
-      Thread t = new Thread() {
-        public void run() {
-          workerContext.getQueryMaster().getContext().stopQuery(queryId);
-        }
-      };
-      t.start();
       return BOOL_TRUE;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index c770696..392a7cf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -136,6 +137,13 @@ public class TajoWorkerManagerService extends CompositeService
   }
 
   @Override
+  public void killTaskAttempt(RpcController controller, TajoIdProtos.QueryUnitAttemptIdProto
request,
+                              RpcCallback<PrimitiveProtos.BoolProto> done) {
+    workerContext.getTaskRunnerManager().findTaskByQueryUnitAttemptId(new QueryUnitAttemptId(request)).kill();
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+
+  @Override
   public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request,
                       RpcCallback<PrimitiveProtos.BoolProto> done) {
     workerContext.cleanup(new QueryId(request).toString());

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index c31e9cd..066e11c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -91,9 +91,10 @@ public class Task {
   private final Reporter reporter;
   private Path inputTableBaseDir;
 
-  private static int completed = 0;
-  private static int failed = 0;
-  private static int succeeded = 0;
+  private static int completedTasksNum = 0;
+  private static int succeededTasksNum = 0;
+  private static int killedTasksNum = 0;
+  private static int failedTasksNum = 0;
 
   private long startTime;
   private long finishTime;
@@ -282,7 +283,6 @@ public class Task {
   public void kill() {
     killed = true;
     context.stop();
-    context.setState(TaskAttemptState.TA_KILLED);
     setProgressFlag();
     releaseChannelFactory();
   }
@@ -290,7 +290,6 @@ public class Task {
   public void abort() {
     aborted = true;
     context.stop();
-    context.setState(TaskAttemptState.TA_FAILED);
     releaseChannelFactory();
   }
 
@@ -372,7 +371,7 @@ public class Task {
         this.executor = taskRunnerContext.getTQueryEngine().
             createPlan(context, plan);
         this.executor.init();
-        while(executor.next() != null && !killed) {
+        while(!killed && executor.next() != null) {
           ++progress;
         }
         this.executor.close();
@@ -386,21 +385,25 @@ public class Task {
     } finally {
       setProgressFlag();
       stopped = true;
-      completed++;
+      completedTasksNum++;
 
       if (killed || aborted) {
         context.setProgress(0.0f);
         if(killed) {
           context.setState(TaskAttemptState.TA_KILLED);
+          masterProxy.statusUpdate(null, getReport(), NullCallback.get());
+          killedTasksNum++;
         } else {
           context.setState(TaskAttemptState.TA_FAILED);
-        }
-
-        TaskFatalErrorReport.Builder errorBuilder =
-            TaskFatalErrorReport.newBuilder()
-            .setId(getId().getProto());
-        if (errorMessage != null) {
+          TaskFatalErrorReport.Builder errorBuilder =
+              TaskFatalErrorReport.newBuilder()
+                  .setId(getId().getProto());
+          if (errorMessage != null) {
             errorBuilder.setErrorMessage(errorMessage);
+          }
+
+          masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
+          failedTasksNum++;
         }
 
         // stopping the status report
@@ -410,9 +413,6 @@ public class Task {
           LOG.warn(e);
         }
 
-        masterProxy.fatalError(null, errorBuilder.build(), NullCallback.get());
-        failed++;
-
       } else {
         // if successful
         context.setProgress(1.0f);
@@ -427,14 +427,14 @@ public class Task {
 
         TaskCompletionReport report = getTaskCompletionReport();
         masterProxy.done(null, report, NullCallback.get());
-        succeeded++;
+        succeededTasksNum++;
       }
 
       finishTime = System.currentTimeMillis();
 
       cleanupTask();
-      LOG.info("Task Counter - total:" + completed + ", succeeded: " + succeeded
-          + ", failed: " + failed);
+      LOG.info("Task Counter - total:" + completedTasksNum + ", succeeded: " + succeededTasksNum
+          + ", killed: " + killedTasksNum + ", failed: " + failedTasksNum);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 7b49c15..9a38aef 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -81,8 +81,7 @@ public class TaskRunner extends AbstractService {
   // for Fetcher
   private final ExecutorService fetchLauncher;
   // It keeps all of the query unit attempts while a TaskRunner is running.
-  private final Map<QueryUnitAttemptId, Task> tasks =
-      new ConcurrentHashMap<QueryUnitAttemptId, Task>();
+  private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId,
Task>();
 
   private final Map<QueryUnitAttemptId, TaskHistory> taskHistories =
       new ConcurrentHashMap<QueryUnitAttemptId, TaskHistory>();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
index fe5bf03..e12c9aa 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/QueryMasterProtocol.proto
@@ -37,5 +37,6 @@ service QueryMasterProtocolService {
   rpc done (TaskCompletionReport) returns (BoolProto);
 
   //from TajoMaster's QueryJobManager
+  rpc killQuery(QueryIdProto) returns (BoolProto);
   rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index e08da29..3fdd221 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -156,9 +156,9 @@ message RunExecutionBlockRequestProto {
 service TajoWorkerProtocolService {
   rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
 
-  //from QueryMaster(Worker)
+  // from QueryMaster(Worker)
   rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto);
-
+  rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto);
   rpc cleanup(QueryIdProto) returns (BoolProto);
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
index 80259c8..e3a356d 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/admin/query.jsp
@@ -69,7 +69,7 @@
   } else {
 %>
   <table width="100%" border="1" class='border_table'>
-    <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>sql</th></tr>
+    <tr></tr><th>QueryId</th><th>Query Master</th><th>Started</th><th>Progress</th><th>Time</th><th>Status</th></th><th>sql</th></tr>
     <%
       for(QueryInProgress eachQuery: runningQueries) {
         long time = System.currentTimeMillis() - eachQuery.getQueryInfo().getStartTime();
@@ -82,6 +82,7 @@
       <td><%=df.format(eachQuery.getQueryInfo().getStartTime())%></td>
       <td><%=(int)(eachQuery.getQueryInfo().getProgress() * 100.0f)%>%</td>
       <td><%=StringUtils.formatTime(time)%></td>
+      <td><%=eachQuery.getQueryInfo().getQueryState()%></td>
       <td><%=eachQuery.getQueryInfo().getSql()%></td>
     </tr>
     <%

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
index 7656dfb..94e5f2e 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
+++ b/tajo-core/tajo-core-backend/src/main/resources/webapps/worker/querydetail.jsp
@@ -62,7 +62,7 @@
     <tr><th>ID</th><th>State</th><th>Started</th><th>Finished</th><th>Running
time</th><th>Progress</th><th>Tasks</th></tr>
 <%
 for(SubQuery eachSubQuery: subQueries) {
-    eachSubQuery.getCompletedObjectCount();
+    eachSubQuery.getSucceededObjectCount();
     String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachSubQuery.getId();
 %>
   <tr>
@@ -72,7 +72,7 @@ for(SubQuery eachSubQuery: subQueries) {
     <td><%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%></td>
     <td><%=JSPUtil.getElapsedTime(eachSubQuery.getStartTime(), eachSubQuery.getFinishTime())%></td>
     <td align='center'><%=JSPUtil.percentFormat(eachSubQuery.getProgress())%>%</td>
-    <td align='center'><a href='<%=detailLink%>&status=SUCCEEDED'><%=eachSubQuery.getCompletedObjectCount()%></a>/<a
href='<%=detailLink%>&status=ALL'><%=eachSubQuery.getTotalScheduledObjectsCount()%></a></td>
+    <td align='center'><a href='<%=detailLink%>&status=SUCCEEDED'><%=eachSubQuery.getSucceededObjectCount()%></a>/<a
href='<%=detailLink%>&status=ALL'><%=eachSubQuery.getTotalScheduledObjectsCount()%></a></td>
   </tr>
   <%
 }  //end of for

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/ae541ffa/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 13db0d0..ee03bd6 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -24,14 +24,12 @@ import com.sun.org.apache.commons.logging.Log;
 import com.sun.org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.BackendTestingUtil;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.TpchTestBase;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.BeforeClass;
@@ -68,6 +66,15 @@ public class TestTajoClient {
   }
 
   @Test
+  public final void testKillQuery() throws IOException, ServiceException, InterruptedException
{
+    ClientProtos.GetQueryStatusResponse res = client.executeQuery("select sleep(1) from lineitem");
+    Thread.sleep(1000);
+    QueryId queryId = new QueryId(res.getQueryId());
+    client.killQuery(queryId);
+    assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId).getState());
+  }
+
+  @Test
   public final void testUpdateQuery() throws IOException, ServiceException {
     final String tableName = "testUpdateQuery";
     Path tablePath = writeTmpTable(tableName);


Mime
View raw message