tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [6/8] TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via hyunsik)
Date Mon, 26 Aug 2013 12:29:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
deleted file mode 100644
index e6d4c56..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskRunnerLauncherImpl.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.master;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.tajo.SubQueryId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
-import org.apache.tajo.master.event.QueryEvent;
-import org.apache.tajo.master.event.QueryEventType;
-import org.apache.tajo.master.querymaster.QueryMaster;
-import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
-import org.apache.tajo.worker.TaskRunner;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.Vector;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class TaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
-
-  /** Class Logger */
-  private static final Log LOG = LogFactory.getLog(TaskRunnerLauncherImpl.class);
-  private QueryContext context;
-  private final String queryMasterHost;
-  private final int queryMasterPort;
-
-  // For ContainerLauncherSpec
-  private ContainerLaunchContext commonContainerSpec = null;
-
-  /** for launching TaskRunners in parallel */
-  private final ExecutorService executorService;
-
-  public TaskRunnerLauncherImpl(QueryContext context) {
-    super(TaskRunnerLauncherImpl.class.getName());
-    this.context = context;
-    queryMasterHost = context.getQueryMasterServiceAddress().getHostName();
-    queryMasterPort = context.getQueryMasterServiceAddress().getPort();
-    executorService = Executors.newFixedThreadPool(
-        context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
-  }
-
-  public void start() {
-    super.start();
-  }
-
-  public void stop() {
-    executorService.shutdownNow();
-    Map<ContainerId, ContainerProxy> containers = context.getContainers();
-    for(ContainerProxy eachProxy: containers.values()) {
-      try {
-        eachProxy.kill();
-      } catch (Exception e) {
-      }
-    }
-    super.stop();
-  }
-
-  @Override
-  public void handle(TaskRunnerGroupEvent event) {
-    if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
-     launchTaskRunners(event.subQueryId, event.getContainers());
-    } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
-      killTaskRunners(event.getContainers());
-    }
-  }
-
-  private void launchTaskRunners(SubQueryId subQueryId, Collection<Container> containers) {
-    commonContainerSpec = ContainerProxy.createCommonContainerLaunchContext(getConfig(), subQueryId.toString(), false);
-    for (Container container : containers) {
-      final ContainerProxy proxy =
-          new TaskRunnerContainerProxy(context, getConfig(), context.getYarnRPC(), container, subQueryId);
-      executorService.submit(new LaunchRunner(container.getId(), proxy));
-    }
-  }
-
-  private class LaunchRunner implements Runnable {
-    private final ContainerProxy proxy;
-    private final ContainerId id;
-
-    public LaunchRunner(ContainerId id, ContainerProxy proxy) {
-      this.proxy = proxy;
-      this.id = id;
-    }
-    @Override
-    public void run() {
-      proxy.launch(commonContainerSpec);
-      LOG.info("ContainerProxy started:" + id);
-    }
-  }
-
-  private void killTaskRunners(Collection<Container> containers) {
-    for (Container container : containers) {
-      final ContainerProxy proxy = context.getContainer(container.getId());
-      executorService.submit(new KillRunner(container.getId(), proxy));
-    }
-  }
-
-  private class KillRunner implements Runnable {
-    private final ContainerProxy proxy;
-    private final ContainerId id;
-    public KillRunner(ContainerId id, ContainerProxy proxy) {
-      this.id = id;
-      this.proxy = proxy;
-    }
-
-    @Override
-    public void run() {
-      proxy.kill();
-      LOG.info("ContainerProxy killed:" + id);
-    }
-  }
-
-  public class TaskRunnerContainerProxy extends ContainerProxy {
-    private final SubQueryId subQueryId;
-
-    public TaskRunnerContainerProxy(QueryMaster.QueryContext context, Configuration conf, YarnRPC yarnRPC,
-                                    Container container, SubQueryId subQueryId) {
-      super(context, conf, yarnRPC, container);
-      this.subQueryId = subQueryId;
-    }
-
-    @Override
-    protected void containerStarted() {
-      context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
-    }
-
-    @Override
-    protected String getId() {
-      return subQueryId.toString();
-    }
-
-    @Override
-    protected String getRunnerClass() {
-      return TaskRunner.class.getCanonicalName();
-    }
-
-    @Override
-    protected Vector<CharSequence> getTaskParams() {
-      Vector<CharSequence> taskParams = new Vector<CharSequence>();
-      taskParams.add(queryMasterHost); // queryMaster hostname
-      taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
-
-      return taskParams;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index c9702b4..651f9c0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -23,28 +23,25 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.SubQueryId;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
-import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
 import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
 import org.apache.tajo.master.event.TaskRequestEvent;
-import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
 import org.apache.tajo.master.event.TaskScheduleEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
-import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
 
 import java.net.URI;
 import java.util.*;
@@ -55,8 +52,8 @@ public class TaskSchedulerImpl extends AbstractService
     implements TaskScheduler {
   private static final Log LOG = LogFactory.getLog(TaskScheduleEvent.class);
 
-  private final QueryContext context;
-  private AsyncDispatcher dispatcher;
+  private final QueryMasterTask.QueryContext context;
+  private TajoAsyncDispatcher dispatcher;
 
   private Thread eventHandlingThread;
   private Thread schedulingThread;
@@ -72,21 +69,22 @@ public class TaskSchedulerImpl extends AbstractService
   private int rackLocalAssigned = 0;
   private int totalAssigned = 0;
 
-  public TaskSchedulerImpl(QueryContext context) {
+  public TaskSchedulerImpl(QueryMasterTask.QueryContext context) {
     super(TaskSchedulerImpl.class.getName());
     this.context = context;
     this.dispatcher = context.getDispatcher();
   }
 
+  @Override
   public void init(Configuration conf) {
 
     scheduledRequests = new ScheduledRequests();
     taskRequests  = new TaskRequests();
-    dispatcher.register(TaskRequestEventType.class, taskRequests);
 
     super.init(conf);
   }
 
+  @Override
   public void start() {
     LOG.info("Start TaskScheduler");
     this.eventHandlingThread = new Thread() {
@@ -113,13 +111,14 @@ public class TaskSchedulerImpl extends AbstractService
 
         while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
           try {
-            Thread.sleep(100);
+            Thread.sleep(1000);
           } catch (InterruptedException e) {
             break;
           }
 
           schedule();
         }
+        //req.getCallback().run(stopTaskRunnerReq);
         LOG.info("TaskScheduler schedulingThread stopped");
       }
     };
@@ -128,15 +127,15 @@ public class TaskSchedulerImpl extends AbstractService
     super.start();
   }
 
-  private static final QueryUnitAttemptId NULL_ID;
-  private static final QueryMasterProtocol.QueryUnitRequestProto stopTaskRunnerReq;
+  private static final QueryUnitAttemptId NULL_ATTEMPT_ID;
+  public static final TajoWorkerProtocol.QueryUnitRequestProto stopTaskRunnerReq;
   static {
-    SubQueryId nullSubQuery =
-        QueryIdFactory.newSubQueryId(TajoIdUtils.NullQueryId);
-    NULL_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
+    ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0);
+    NULL_ATTEMPT_ID = QueryIdFactory.newQueryUnitAttemptId(QueryIdFactory.newQueryUnitId(nullSubQuery, 0), 0);
 
-    QueryMasterProtocol.QueryUnitRequestProto.Builder builder = QueryMasterProtocol.QueryUnitRequestProto.newBuilder();
-    builder.setId(NULL_ID.getProto());
+    TajoWorkerProtocol.QueryUnitRequestProto.Builder builder =
+        TajoWorkerProtocol.QueryUnitRequestProto.newBuilder();
+    builder.setId(NULL_ATTEMPT_ID.getProto());
     builder.setShouldDie(true);
     builder.setOutputTable("");
     builder.setSerializedData("");
@@ -144,7 +143,7 @@ public class TaskSchedulerImpl extends AbstractService
     stopTaskRunnerReq = builder.build();
   }
 
-
+  @Override
   public void stop() {
     stopEventHandling = true;
     eventHandlingThread.interrupt();
@@ -205,12 +204,12 @@ public class TaskSchedulerImpl extends AbstractService
   public void handle(TaskSchedulerEvent event) {
     int qSize = eventQueue.size();
     if (qSize != 0 && qSize % 1000 == 0) {
-      LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+      LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
     }
     int remCapacity = eventQueue.remainingCapacity();
     if (remCapacity < 1000) {
       LOG.warn("Very low remaining capacity in the event-queue "
-          + "of RMContainerAllocator: " + remCapacity);
+          + "of YarnRMContainerAllocator: " + remCapacity);
     }
 
     try {
@@ -220,20 +219,29 @@ public class TaskSchedulerImpl extends AbstractService
     }
   }
 
+  public void handleTaskRequestEvent(TaskRequestEvent event) {
+    taskRequests.handle(event);
+  }
+
   private class TaskRequests implements EventHandler<TaskRequestEvent> {
     private final LinkedBlockingQueue<TaskRequestEvent> taskRequestQueue =
         new LinkedBlockingQueue<TaskRequestEvent>();
 
     @Override
     public void handle(TaskRequestEvent event) {
+      LOG.info("====>TaskRequest:" + event.getContainerId() + "," + event.getExecutionBlockId());
+      if(stopEventHandling) {
+        event.getCallback().run(stopTaskRunnerReq);
+        return;
+      }
       int qSize = taskRequestQueue.size();
       if (qSize != 0 && qSize % 1000 == 0) {
-        LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
+        LOG.info("Size of event-queue in YarnRMContainerAllocator is " + qSize);
       }
       int remCapacity = taskRequestQueue.remainingCapacity();
       if (remCapacity < 1000) {
         LOG.warn("Very low remaining capacity in the event-queue "
-            + "of RMContainerAllocator: " + remCapacity);
+            + "of YarnRMContainerAllocator: " + remCapacity);
       }
 
       taskRequestQueue.add(event);
@@ -380,15 +388,16 @@ public class TaskSchedulerImpl extends AbstractService
       return nonLeafTasks.size();
     }
 
-    public Set<QueryUnitAttemptId> AssignedRequest = new HashSet<QueryUnitAttemptId>();
+    public Set<QueryUnitAttemptId> assignedRequest = new HashSet<QueryUnitAttemptId>();
+
     public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
       Iterator<TaskRequestEvent> it = taskRequests.iterator();
-      LOG.info("Got task requests " + taskRequests.size());
 
       TaskRequestEvent taskRequest;
       while (it.hasNext() && leafTasks.size() > 0) {
         taskRequest = it.next();
-        ContainerProxy container = context.getContainer(taskRequest.getContainerId());
+        LOG.info("====> assignToLeafTasks: " + taskRequest.getExecutionBlockId());
+        ContainerProxy container = context.getResourceAllocator().getContainer(taskRequest.getContainerId());
         String host = container.getTaskHostName();
 
         QueryUnitAttemptId attemptId = null;
@@ -443,7 +452,7 @@ public class TaskSchedulerImpl extends AbstractService
 
         if (attemptId != null) {
           QueryUnit task = context.getQuery()
-              .getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
+              .getSubQuery(attemptId.getQueryUnitId().getExecutionBlockId()).getQueryUnit(attemptId.getQueryUnitId());
           QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
               attemptId,
               new ArrayList<Fragment>(task.getAllFragments()),
@@ -457,7 +466,7 @@ public class TaskSchedulerImpl extends AbstractService
           context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
               taskRequest.getContainerId(),
               host, container.getTaskPort()));
-          AssignedRequest.add(attemptId);
+          assignedRequest.add(attemptId);
 
           totalAssigned++;
           taskRequest.getCallback().run(taskAssign.getProto());
@@ -476,6 +485,7 @@ public class TaskSchedulerImpl extends AbstractService
       TaskRequestEvent taskRequest;
       while (it.hasNext()) {
         taskRequest = it.next();
+        LOG.info("====> assignToNonLeafTasks: " + taskRequest.getExecutionBlockId());
 
         QueryUnitAttemptId attemptId;
         // random allocation
@@ -485,7 +495,8 @@ public class TaskSchedulerImpl extends AbstractService
           LOG.debug("Assigned based on * match");
 
           QueryUnit task;
-          task = context.getSubQuery(attemptId.getSubQueryId()).getQueryUnit(attemptId.getQueryUnitId());
+          task = context.getSubQuery(
+              attemptId.getQueryUnitId().getExecutionBlockId()).getQueryUnit(attemptId.getQueryUnitId());
           QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
               attemptId,
               Lists.newArrayList(task.getAllFragments()),
@@ -504,7 +515,7 @@ public class TaskSchedulerImpl extends AbstractService
             }
           }
 
-          ContainerProxy container = context.getContainer(
+          ContainerProxy container = context.getResourceAllocator().getContainer(
               taskRequest.getContainerId());
           context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
               taskRequest.getContainerId(), container.getTaskHostName(), container.getTaskPort()));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
new file mode 100644
index 0000000..1e650e1
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnContainerProxy.java
@@ -0,0 +1,446 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.event.QueryEvent;
+import org.apache.tajo.master.event.QueryEventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.pullserver.PullServerAuxService;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
+
+public class YarnContainerProxy extends ContainerProxy {
+  private final static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+  protected final YarnRPC yarnRPC;
+  final protected String containerMgrAddress;
+  protected ContainerToken containerToken;
+
+  public YarnContainerProxy(QueryMasterTask.QueryContext context, Configuration conf, YarnRPC yarnRPC,
+                                  Container container, ExecutionBlockId executionBlockId) {
+    super(context, conf, executionBlockId, container);
+    this.yarnRPC = yarnRPC;
+
+    NodeId nodeId = container.getNodeId();
+    this.containerMgrAddress = nodeId.getHost() + ":" + nodeId.getPort();
+    this.containerToken = container.getContainerToken();
+  }
+
+  protected ContainerManager getCMProxy(ContainerId containerID,
+                                        final String containerManagerBindAddr,
+                                        ContainerToken containerToken)
+      throws IOException {
+    String [] hosts = containerManagerBindAddr.split(":");
+    final InetSocketAddress cmAddr =
+        new InetSocketAddress(hosts[0], Integer.parseInt(hosts[1]));
+    UserGroupInformation user = UserGroupInformation.getCurrentUser();
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Token<ContainerTokenIdentifier> token =
+          ProtoUtils.convertFromProtoFormat(containerToken, cmAddr);
+      // the user in createRemoteUser in this context has to be ContainerID
+      user = UserGroupInformation.createRemoteUser(containerID.toString());
+      user.addToken(token);
+    }
+
+    ContainerManager proxy = user.doAs(new PrivilegedAction<ContainerManager>() {
+      @Override
+      public ContainerManager run() {
+        return (ContainerManager) yarnRPC.getProxy(ContainerManager.class,
+            cmAddr, conf);
+      }
+    });
+
+    return proxy;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public synchronized void launch(ContainerLaunchContext commonContainerLaunchContext) {
+    LOG.info("Launching Container with Id: " + containerID);
+    if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
+      state = ContainerState.DONE;
+      LOG.error("Container (" + containerID + " was killed before it was launched");
+      return;
+    }
+
+    ContainerManager proxy = null;
+    try {
+
+      proxy = getCMProxy(containerID, containerMgrAddress,
+          containerToken);
+
+      // Construct the actual Container
+      ContainerLaunchContext containerLaunchContext = createContainerLaunchContext(commonContainerLaunchContext);
+
+      // Now launch the actual container
+      StartContainerRequest startRequest = Records
+          .newRecord(StartContainerRequest.class);
+      startRequest.setContainerLaunchContext(containerLaunchContext);
+      StartContainerResponse response = proxy.startContainer(startRequest);
+
+      ByteBuffer portInfo = response
+          .getServiceResponse(PullServerAuxService.PULLSERVER_SERVICEID);
+
+      if(portInfo != null) {
+        port = PullServerAuxService.deserializeMetaData(portInfo);
+      }
+
+      LOG.info("PullServer port returned by ContainerManager for "
+          + containerID + " : " + port);
+
+      if(port < 0) {
+        this.state = ContainerState.FAILED;
+        throw new IllegalStateException("Invalid shuffle port number "
+            + port + " returned for " + containerID);
+      }
+
+      context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
+
+      this.state = ContainerState.RUNNING;
+      this.hostName = containerMgrAddress.split(":")[0];
+      context.getResourceAllocator().addContainer(containerID, this);
+    } catch (Throwable t) {
+      String message = "Container launch failed for " + containerID + " : "
+          + StringUtils.stringifyException(t);
+      this.state = ContainerState.FAILED;
+      LOG.error(message);
+    } finally {
+      if (proxy != null) {
+        yarnRPC.stopProxy(proxy, conf);
+      }
+    }
+  }
+
+
+  public ContainerLaunchContext createContainerLaunchContext(ContainerLaunchContext commonContainerLaunchContext) {
+    // Setup environment by cloning from common env.
+    Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+
+    // Duplicate the ByteBuffers for access by multiple containers.
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+    for (Map.Entry<String, ByteBuffer> entry : commonContainerLaunchContext.getServiceData().entrySet()) {
+      myServiceData.put(entry.getKey(), entry.getValue().duplicate());
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the local resources
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the necessary command to execute the application master
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command
+    //LOG.info("Setting up app master command");
+    vargs.add("${JAVA_HOME}" + "/bin/java");
+    // Set Xmx based on am memory size
+    vargs.add("-Xmx2000m");
+    // Set Remote Debugging
+    //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
+    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+    //}
+    // Set class name
+    //vargs.add(getRunnerClass());
+    vargs.add(TajoWorker.class.getCanonicalName());
+    vargs.add("tr");     //workerMode
+    vargs.add(getId()); // subqueryId
+    vargs.add(containerMgrAddress); // nodeId
+    vargs.add(containerID.toString()); // containerId
+    Vector<CharSequence> taskParams = getTaskParams();
+    if(taskParams != null) {
+      vargs.addAll(taskParams);
+    }
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up TaskRunner command " + command.toString());
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());
+
+    return BuilderUtils.newContainerLaunchContext(containerID, commonContainerLaunchContext.getUser(),
+        container.getResource(), commonContainerLaunchContext.getLocalResources(), myEnv, commands,
+        myServiceData, null, new HashMap<ApplicationAccessType, String>());
+  }
+
+  public static ContainerLaunchContext createCommonContainerLaunchContext(Configuration config,
+                                                                          String queryId, boolean isMaster) {
+    TajoConf conf = (TajoConf)config;
+
+    ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+    try {
+      ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the env variables to be setup
+    ////////////////////////////////////////////////////////////////////////////
+    LOG.info("Set the environment for the application master");
+
+    Map<String, String> environment = new HashMap<String, String>();
+    //String initialClassPath = getInitialClasspath(conf);
+    environment.put(ApplicationConstants.Environment.SHELL.name(), "/bin/bash");
+    if(System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()) != null) {
+      environment.put(ApplicationConstants.Environment.JAVA_HOME.name(), System.getenv(ApplicationConstants.Environment.JAVA_HOME.name()));
+    }
+
+    // TODO - to be improved with org.apache.tajo.sh shell script
+    Properties prop = System.getProperties();
+
+    if (prop.getProperty("tajo.test", "FALSE").equalsIgnoreCase("TRUE") ||
+        (System.getenv("tajo.test") != null && System.getenv("tajo.test").equalsIgnoreCase("TRUE"))) {
+      LOG.info("=========> tajo.test is TRUE");
+      environment.put(ApplicationConstants.Environment.CLASSPATH.name(), prop.getProperty(
+          "java.class.path", null));
+      environment.put("tajo.test", "TRUE");
+    } else {
+      // Add AppMaster.jar location to classpath
+      // At some point we should not be required to add
+      // the hadoop specific classpaths to the env.
+      // It should be provided out of the box.
+      // For now setting all required classpaths including
+      // the classpath to "." for the application jar
+      StringBuilder classPathEnv = new StringBuilder("./");
+      //for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH)) {
+      for (String c : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH) {
+        classPathEnv.append(':');
+        classPathEnv.append(c.trim());
+      }
+
+      classPathEnv.append(":" + System.getenv("TAJO_BASE_CLASSPATH"));
+      classPathEnv.append(":./log4j.properties:./*");
+      if(System.getenv("HADOOP_HOME") != null) {
+        environment.put("HADOOP_HOME", System.getenv("HADOOP_HOME"));
+        environment.put(
+            ApplicationConstants.Environment.HADOOP_COMMON_HOME.name(),
+            System.getenv("HADOOP_HOME"));
+        environment.put(
+            ApplicationConstants.Environment.HADOOP_HDFS_HOME.name(),
+            System.getenv("HADOOP_HOME"));
+        environment.put(
+            ApplicationConstants.Environment.HADOOP_YARN_HOME.name(),
+            System.getenv("HADOOP_HOME"));
+      }
+
+      if(System.getenv("TAJO_BASE_CLASSPATH") != null) {
+        environment.put("TAJO_BASE_CLASSPATH", System.getenv("TAJO_BASE_CLASSPATH"));
+      }
+      environment.put(ApplicationConstants.Environment.CLASSPATH.name(), classPathEnv.toString());
+    }
+
+    ctx.setEnvironment(environment);
+
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("=================================================");
+      for(Map.Entry<String, String> entry: environment.entrySet()) {
+        LOG.debug(entry.getKey() + "=" + entry.getValue());
+      }
+      LOG.debug("=================================================");
+    }
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the local resources
+    ////////////////////////////////////////////////////////////////////////////
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    FileSystem fs = null;
+
+    LOG.info("defaultFS: " + conf.get("fs.default.name"));
+    LOG.info("defaultFS: " + conf.get("fs.defaultFS"));
+    try {
+      fs = FileSystem.get(conf);
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    FileContext fsCtx = null;
+    try {
+      fsCtx = FileContext.getFileContext(conf);
+    } catch (UnsupportedFileSystemException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    LOG.info("Writing a QueryConf to HDFS and add to local environment");
+    //Path queryConfPath = new Path(fs.getHomeDirectory(), QueryConf.FILENAME);
+    try {
+      //writeConf(conf, queryConfPath);
+      // TODO move to tajo temp
+      Path warehousePath = new Path(conf.getVar(TajoConf.ConfVars.ROOT_DIR), TajoConstants.WAREHOUSE_DIR);
+      Path queryConfPath = new Path(warehousePath, queryId);
+      if(isMaster) {
+        queryConfPath = new Path(queryConfPath, QueryConf.QUERY_MASTER_FILENAME);
+      } else {
+        queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
+      }
+
+      if(!fs.exists(queryConfPath)){
+        writeConf(conf, queryConfPath);
+      } else {
+        LOG.warn("QueryConf already exist. path: "  + queryConfPath.toString());
+      }
+
+      LocalResource queryConfSrc = createApplicationResource(fsCtx,
+          queryConfPath, LocalResourceType.FILE);
+//        localResources.put(QueryConf.FILENAME,  queryConfSrc);
+      localResources.put(queryConfPath.getName(), queryConfSrc);
+
+      ctx.setLocalResources(localResources);
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    // TODO - move to sub-class
+    // Add shuffle token
+    Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+    try {
+      //LOG.info("Putting shuffle token in serviceData");
+      serviceData.put(PullServerAuxService.PULLSERVER_SERVICEID,
+          PullServerAuxService.serializeMetaData(0));
+    } catch (IOException ioe) {
+      LOG.error(ioe);
+    }
+    ctx.setServiceData(serviceData);
+
+    return ctx;
+  }
+
+  private static LocalResource createApplicationResource(FileContext fs,
+                                                         Path p, LocalResourceType type)
+      throws IOException {
+    LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+    FileStatus rsrcStat = fs.getFileStatus(p);
+    rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+        .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+    rsrc.setSize(rsrcStat.getLen());
+    rsrc.setTimestamp(rsrcStat.getModificationTime());
+    rsrc.setType(type);
+    rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+    return rsrc;
+  }
+
+  private static void writeConf(Configuration conf, Path queryConfFile)
+      throws IOException {
+    // Write job file to Tajo's fs
+    FileSystem fs = queryConfFile.getFileSystem(conf);
+    FSDataOutputStream out =
+        FileSystem.create(fs, queryConfFile,
+            new FsPermission(QUERYCONF_FILE_PERMISSION));
+    try {
+      conf.writeXml(out);
+    } finally {
+      out.close();
+    }
+  }
+
+  @Override
+  public synchronized void stopContainer() {
+
+    if(isCompletelyDone()) {
+      return;
+    }
+    if(this.state == ContainerState.PREP) {
+      this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+    } else {
+      LOG.info("KILLING " + containerID);
+
+      ContainerManager proxy = null;
+      try {
+        proxy = getCMProxy(this.containerID, this.containerMgrAddress,
+            this.containerToken);
+
+        // kill the remote container if already launched
+        StopContainerRequest stopRequest = Records
+            .newRecord(StopContainerRequest.class);
+        stopRequest.setContainerId(this.containerID);
+        proxy.stopContainer(stopRequest);
+        // If stopContainer returns without an error, assuming the stop made
+        // it over to the NodeManager.
+//          context.getEventHandler().handle(
+//              new AMContainerEvent(containerID, AMContainerEventType.C_NM_STOP_SENT));
+        context.getResourceAllocator().removeContainer(containerID);
+      } catch (Throwable t) {
+
+        // ignore the cleanup failure
+        String message = "cleanup failed for container "
+            + this.containerID + " : "
+            + StringUtils.stringifyException(t);
+//          context.getEventHandler().handle(
+//              new AMContainerEventStopFailed(containerID, message));
+        LOG.warn(message);
+        this.state = ContainerState.DONE;
+        return;
+      } finally {
+        if (proxy != null) {
+          yarnRPC.stopProxy(proxy, conf);
+        }
+      }
+      this.state = ContainerState.DONE;
+    }
+  }
+
+  protected Vector<CharSequence> getTaskParams() {
+    String queryMasterHost = context.getQueryMasterContext().getWorkerContext()
+        .getTajoWorkerManagerService().getBindAddr().getHostName();
+    int queryMasterPort = context.getQueryMasterContext().getWorkerContext()
+        .getTajoWorkerManagerService().getBindAddr().getPort();
+
+    Vector<CharSequence> taskParams = new Vector<CharSequence>();
+    taskParams.add(queryMasterHost); // queryMaster hostname
+    taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
+    taskParams.add(context.getOutputPath().toString());
+    return taskParams;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
new file mode 100644
index 0000000..5ac4fb5
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskRunnerLauncher {
+
+  /** Class Logger */
+  private static final Log LOG = LogFactory.getLog(YarnTaskRunnerLauncherImpl.class);
+  //private final YarnRPC yarnRPC;
+  private final static RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+  private QueryMasterTask.QueryContext context;
+
+  // For ContainerLauncherSpec
+  private static AtomicBoolean initialClasspathFlag = new AtomicBoolean();
+  private static String initialClasspath = null;
+  private static final Object classpathLock = new Object();
+  private ContainerLaunchContext commonContainerSpec = null;
+
+  final public static FsPermission QUERYCONF_FILE_PERMISSION =
+      FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+  /** for launching TaskRunners in parallel */
+  private final ExecutorService executorService;
+
+  private YarnRPC yarnRPC;
+
+  public YarnTaskRunnerLauncherImpl(QueryMasterTask.QueryContext context, YarnRPC yarnRPC) {
+    super(YarnTaskRunnerLauncherImpl.class.getName());
+    this.context = context;
+    this.yarnRPC = yarnRPC;
+    executorService = Executors.newFixedThreadPool(
+        context.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+  }
+
+  public void start() {
+    super.start();
+  }
+
+  public void stop() {
+    executorService.shutdownNow();
+
+    while(!executorService.isTerminated()) {
+      LOG.info("====>executorService.isTerminated:" + executorService.isTerminated() + "," + executorService.isShutdown());
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+    }
+    Map<ContainerId, ContainerProxy> containers = context.getResourceAllocator().getContainers();
+    for(ContainerProxy eachProxy: containers.values()) {
+      try {
+        eachProxy.stopContainer();
+      } catch (Exception e) {
+      }
+    }
+    super.stop();
+  }
+
+  @Override
+  public void handle(TaskRunnerGroupEvent event) {
+    if (event.getType() == EventType.CONTAINER_REMOTE_LAUNCH) {
+     launchTaskRunners(event.executionBlockId, event.getContainers());
+    } else if (event.getType() == EventType.CONTAINER_REMOTE_CLEANUP) {
+      stopTaskRunners(event.getContainers());
+    }
+  }
+
+  private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
+    commonContainerSpec = YarnContainerProxy.createCommonContainerLaunchContext(getConfig(),
+        executionBlockId.getQueryId().toString(), false);
+    for (Container container : containers) {
+      final ContainerProxy proxy = new YarnContainerProxy(context, getConfig(),
+          yarnRPC, container, executionBlockId);
+      executorService.submit(new LaunchRunner(container.getId(), proxy));
+    }
+  }
+
+  protected class LaunchRunner implements Runnable {
+    private final ContainerProxy proxy;
+    private final ContainerId id;
+    public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+      this.proxy = proxy;
+      this.id = id;
+    }
+    @Override
+    public void run() {
+      proxy.launch(commonContainerSpec);
+      LOG.info("ContainerProxy started:" + id);
+    }
+  }
+
+  private void stopTaskRunners(Collection<Container> containers) {
+    for (Container container : containers) {
+      final ContainerProxy proxy = context.getResourceAllocator().getContainer(container.getId());
+      executorService.submit(new StopContainerRunner(container.getId(), proxy));
+    }
+  }
+
+  private class StopContainerRunner implements Runnable {
+    private final ContainerProxy proxy;
+    private final ContainerId id;
+    public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+      this.id = id;
+      this.proxy = proxy;
+    }
+
+    @Override
+    public void run() {
+      proxy.stopContainer();
+      LOG.info("ContainerProxy stopped:" + id);
+    }
+  }
+
+
+  /**
+   * Lock this on initialClasspath so that there is only one fork in the AM for
+   * getting the initial class-path. TODO: We already construct
+   * a parent CLC and use it for all the containers, so this should go away
+   * once the mr-generated-classpath stuff is gone.
+   */
+  private static String getInitialClasspath(Configuration conf) {
+    synchronized (classpathLock) {
+      if (initialClasspathFlag.get()) {
+        return initialClasspath;
+      }
+      Map<String, String> env = new HashMap<String, String>();
+
+      initialClasspath = env.get(Environment.CLASSPATH.name());
+      initialClasspathFlag.set(true);
+      return initialClasspath;
+    }
+  }
+
+//  public class TaskRunnerContainerProxy extends ContainerProxy {
+//    private final ExecutionBlockId executionBlockId;
+//
+//    public TaskRunnerContainerProxy(QueryMasterTask.QueryContext context, Configuration conf, YarnRPC yarnRPC,
+//                                    Container container, ExecutionBlockId executionBlockId) {
+//      super(context, conf, yarnRPC, container);
+//      this.executionBlockId = executionBlockId;
+//    }
+//
+//    @Override
+//    protected void containerStarted() {
+//      context.getEventHandler().handle(new QueryEvent(context.getQueryId(), QueryEventType.INIT_COMPLETED));
+//    }
+//
+//    @Override
+//    protected String getId() {
+//      return executionBlockId.toString();
+//    }
+//
+//    @Override
+//    protected String getRunnerClass() {
+//      return TaskRunner.class.getCanonicalName();
+//    }
+//
+//    @Override
+//    protected Vector<CharSequence> getTaskParams() {
+//      Vector<CharSequence> taskParams = new Vector<CharSequence>();
+//      taskParams.add(queryMasterHost); // queryMaster hostname
+//      taskParams.add(String.valueOf(queryMasterPort)); // queryMaster port
+//
+//      return taskParams;
+//    }
+//  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
index 6704aa4..ee594a3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/ContainerAllocationEvent.java
@@ -21,11 +21,11 @@ package org.apache.tajo.master.event;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 
 public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEventType>  {
 
-  private final SubQueryId subQueryId;
+  private final ExecutionBlockId executionBlockId;
   private final Priority priority;
   private final Resource resource;
   private final boolean isLeafQuery;
@@ -33,13 +33,13 @@ public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEv
   private final float progress;
 
   public ContainerAllocationEvent(ContainerAllocatorEventType eventType,
-                                         SubQueryId subQueryId,
-                                         Priority priority,
-                                         Resource resource,
-                                         int requiredNum,
-                                         boolean isLeafQuery, float progress) {
+                                  ExecutionBlockId executionBlockId,
+                                  Priority priority,
+                                  Resource resource,
+                                  int requiredNum,
+                                  boolean isLeafQuery, float progress) {
     super(eventType);
-    this.subQueryId = subQueryId;
+    this.executionBlockId = executionBlockId;
     this.priority = priority;
     this.resource = resource;
     this.requiredNum = requiredNum;
@@ -47,8 +47,8 @@ public class ContainerAllocationEvent extends AbstractEvent<ContainerAllocatorEv
     this.progress = progress;
   }
 
-  public SubQueryId getSubQueryId() {
-    return subQueryId;
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
   }
 
   public Priority getPriority() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
index 44abf30..c34b174 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/GrouppedContainerAllocatorEvent.java
@@ -20,7 +20,7 @@ package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 
 import java.util.Map;
 
@@ -29,12 +29,12 @@ public class GrouppedContainerAllocatorEvent
   private final Map<String, Integer> requestMap;
 
   public GrouppedContainerAllocatorEvent(ContainerAllocatorEventType eventType,
-                                         SubQueryId subQueryId,
+                                         ExecutionBlockId executionBlockId,
                                          Priority priority,
                                          Resource resource,
                                          Map<String, Integer> requestMap,
                                          boolean isLeafQuery, float progress) {
-    super(eventType, subQueryId, priority,
+    super(eventType, executionBlockId, priority,
         resource, requestMap.size(), isLeafQuery, progress);
     this.requestMap = requestMap;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
new file mode 100644
index 0000000..6ae8ff7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QueryStartEvent.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.event;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.tajo.QueryId;
+
+public class QueryStartEvent extends AbstractEvent {
+  public enum EventType {
+    QUERY_START
+  }
+
+  private QueryId queryId;
+  private String logicalPlanJson;
+
+  public QueryStartEvent(QueryId queryId, String logicalPlanJson) {
+    super(EventType.QUERY_START);
+    this.queryId = queryId;
+    this.logicalPlanJson = logicalPlanJson;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public String getLogicalPlanJson() {
+    return logicalPlanJson;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getName() + "," + getType() + "," + queryId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
index 10b67fe..ae36a69 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java
@@ -18,18 +18,18 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 
 public class QuerySubQueryEvent extends QueryEvent {
-  private SubQueryId subQueryId;
+  private ExecutionBlockId executionBlockId;
 
-  public QuerySubQueryEvent(final SubQueryId id,
+  public QuerySubQueryEvent(final ExecutionBlockId id,
                             final QueryEventType queryEvent) {
     super(id.getQueryId(), queryEvent);
-    this.subQueryId = id;
+    this.executionBlockId = id;
   }
 
-  public SubQueryId getSubQueryId() {
-    return this.subQueryId;
+  public ExecutionBlockId getExecutionBlockId() {
+    return this.executionBlockId;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
index 26c7231..7e07525 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java
@@ -18,22 +18,22 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.master.querymaster.SubQueryState;
 
 public class SubQueryCompletedEvent extends QueryEvent {
-  private final SubQueryId subQueryId;
+  private final ExecutionBlockId executionBlockId;
   private final SubQueryState finalState;
 
-  public SubQueryCompletedEvent(final SubQueryId subQueryId,
+  public SubQueryCompletedEvent(final ExecutionBlockId executionBlockId,
                                 SubQueryState finalState) {
-    super(subQueryId.getQueryId(), QueryEventType.SUBQUERY_COMPLETED);
-    this.subQueryId = subQueryId;
+    super(executionBlockId.getQueryId(), QueryEventType.SUBQUERY_COMPLETED);
+    this.executionBlockId = executionBlockId;
     this.finalState = finalState;
   }
 
-  public SubQueryId getSubQueryId() {
-    return subQueryId;
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
   }
 
   public SubQueryState getFinalState() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
index 5c0ef9a..a8f4800 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java
@@ -19,14 +19,14 @@
 package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 
 import java.util.List;
 
 public class SubQueryContainerAllocationEvent extends SubQueryEvent {
   private List<Container> allocatedContainer;
 
-  public SubQueryContainerAllocationEvent(final SubQueryId id,
+  public SubQueryContainerAllocationEvent(final ExecutionBlockId id,
                                           List<Container> allocatedContainer) {
     super(id, SubQueryEventType.SQ_CONTAINER_ALLOCATED);
     this.allocatedContainer = allocatedContainer;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
index 11470ed..2b3d598 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java
@@ -19,17 +19,17 @@
 package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 
 public class SubQueryEvent extends AbstractEvent<SubQueryEventType> {
-  private final SubQueryId id;
+  private final ExecutionBlockId id;
 
-  public SubQueryEvent(SubQueryId id, SubQueryEventType subQueryEventType) {
+  public SubQueryEvent(ExecutionBlockId id, SubQueryEventType subQueryEventType) {
     super(subQueryEventType);
     this.id = id;
   }
 
-  public SubQueryId getSubQueryId() {
+  public ExecutionBlockId getSubQueryId() {
     return id;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
index d85d4f2..e02196a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQuerySucceeEvent.java
@@ -18,14 +18,14 @@
 
 package org.apache.tajo.master.event;
 
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.master.querymaster.SubQueryState;
 
 public class SubQuerySucceeEvent extends SubQueryCompletedEvent {
   private final TableMeta tableMeta;
 
-  public SubQuerySucceeEvent(final SubQueryId id, TableMeta tableMeta) {
+  public SubQuerySucceeEvent(final ExecutionBlockId id, TableMeta tableMeta) {
     super(id, SubQueryState.SUCCEEDED);
     this.tableMeta = tableMeta;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
index 0315236..0217f20 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java
@@ -27,7 +27,7 @@ public class SubQueryTaskEvent extends SubQueryEvent {
   private QueryUnitId taskId;
   public SubQueryTaskEvent(QueryUnitId taskId,
                            SubQueryEventType subQueryEventType) {
-    super(taskId.getSubQueryId(), subQueryEventType);
+    super(taskId.getExecutionBlockId(), subQueryEventType);
     this.taskId = taskId;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
index bc84011..d980e05 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskAttemptStatusUpdateEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.ipc.QueryMasterProtocol.TaskStatusProto;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskStatusProto;
 
 public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
   private final TaskStatusProto status;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
index e3a4b5f..3ee389a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskCompletionEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.ipc.QueryMasterProtocol.TaskCompletionReport;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskCompletionReport;
 
 public class TaskCompletionEvent extends TaskAttemptEvent {
   private TaskCompletionReport report;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
index 06fb392..d70de8a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.ipc.QueryMasterProtocol.TaskFatalErrorReport;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TaskFatalErrorReport;
 
 public class TaskFatalErrorEvent extends TaskAttemptEvent {
   private TaskFatalErrorReport report;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
index 166e103..9be7cab 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskRequestEvent.java
@@ -21,7 +21,8 @@ package org.apache.tajo.master.event;
 import com.google.protobuf.RpcCallback;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryUnitRequestProto;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.ipc.TajoWorkerProtocol.QueryUnitRequestProto;
 import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
 
 public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
@@ -31,12 +32,16 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
   }
 
   private final ContainerId workerId;
+  private final ExecutionBlockId executionBlockId;
+
   private final RpcCallback<QueryUnitRequestProto> callback;
 
   public TaskRequestEvent(ContainerId workerId,
+                          ExecutionBlockId executionBlockId,
                           RpcCallback<QueryUnitRequestProto> callback) {
     super(TaskRequestEventType.TASK_REQ);
     this.workerId = workerId;
+    this.executionBlockId = executionBlockId;
     this.callback = callback;
   }
 
@@ -44,6 +49,10 @@ public class TaskRequestEvent extends AbstractEvent<TaskRequestEventType> {
     return this.workerId;
   }
 
+  public ExecutionBlockId getExecutionBlockId() {
+    return executionBlockId;
+  }
+
   public RpcCallback<QueryUnitRequestProto> getCallback() {
     return this.callback;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
index 1f87356..f460203 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
@@ -34,7 +34,7 @@ public class TaskScheduleEvent extends TaskSchedulerEvent {
                            final EventType eventType, boolean isLeafQuery,
                            final List<QueryUnit.DataLocation> dataLocations,
                            final String[] racks) {
-    super(eventType, attemptId.getSubQueryId());
+    super(eventType, attemptId.getQueryUnitId().getExecutionBlockId());
     this.attemptId = attemptId;
     this.isLeafQuery = isLeafQuery;
     this.dataLocations = dataLocations;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
index d73bb87..71d8587 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskSchedulerEvent.java
@@ -19,7 +19,7 @@
 package org.apache.tajo.master.event;
 
 import org.apache.hadoop.yarn.event.AbstractEvent;
-import org.apache.tajo.SubQueryId;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
 
 public class TaskSchedulerEvent extends AbstractEvent<EventType> {
@@ -28,14 +28,14 @@ public class TaskSchedulerEvent extends AbstractEvent<EventType> {
     T_SUBQUERY_COMPLETED
   }
 
-  private final SubQueryId subQueryId;
+  private final ExecutionBlockId executionBlockId;
 
-  public TaskSchedulerEvent(EventType eventType, SubQueryId subQueryId) {
+  public TaskSchedulerEvent(EventType eventType, ExecutionBlockId queryBlockId) {
     super(eventType);
-    this.subQueryId = subQueryId;
+    this.executionBlockId = queryBlockId;
   }
 
-  public SubQueryId getSubQueryId() {
-    return this.subQueryId;
+  public ExecutionBlockId getExecutionBlockId() {
+    return this.executionBlockId;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 3179abf..99b7c62 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.*;
+import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryConf;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.SubQueryId;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableDescImpl;
@@ -35,7 +35,6 @@ import org.apache.tajo.engine.planner.global.MasterPlan;
 import org.apache.tajo.master.ExecutionBlock;
 import org.apache.tajo.master.ExecutionBlockCursor;
 import org.apache.tajo.master.event.*;
-import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
 import org.apache.tajo.storage.StorageManager;
 
 import java.io.IOException;
@@ -50,16 +49,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 public class Query implements EventHandler<QueryEvent> {
   private static final Log LOG = LogFactory.getLog(Query.class);
 
-
   // Facilities for Query
   private final QueryConf conf;
   private final Clock clock;
   private String queryStr;
-  private Map<SubQueryId, SubQuery> subqueries;
+  private Map<ExecutionBlockId, SubQuery> subqueries;
   private final EventHandler eventHandler;
   private final MasterPlan plan;
   private final StorageManager sm;
-  private QueryContext context;
+  private QueryMasterTask.QueryContext context;
   private ExecutionBlockCursor cursor;
 
   // Query Status
@@ -106,22 +104,21 @@ public class Query implements EventHandler<QueryEvent> {
 
       .installTopology();
 
-  public Query(final QueryContext context, final QueryId id, Clock clock,
+  public Query(final QueryMasterTask.QueryContext context, final QueryId id,
                final long appSubmitTime,
                final String queryStr,
                final EventHandler eventHandler,
-               final MasterPlan plan,
-               final StorageManager sm) {
+               final MasterPlan plan) {
     this.context = context;
     this.conf = context.getConf();
     this.id = id;
-    this.clock = clock;
+    this.clock = context.getClock();
     this.appSubmitTime = appSubmitTime;
     this.queryStr = queryStr;
     subqueries = Maps.newHashMap();
     this.eventHandler = eventHandler;
     this.plan = plan;
-    this.sm = sm;
+    this.sm = context.getStorageManager();
     cursor = new ExecutionBlockCursor(plan);
 
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -131,23 +128,19 @@ public class Query implements EventHandler<QueryEvent> {
     stateMachine = stateMachineFactory.make(this);
   }
 
-  public boolean isCreateTableStmt() {
-    return context.isCreateTableQuery();
-  }
-
-//  protected FileSystem getFileSystem(Configuration conf) throws IOException {
-//    return FileSystem.get(conf);
-//  }
-
   public float getProgress() {
     QueryState state = getStateMachine().getCurrentState();
     if (state == QueryState.QUERY_SUCCEEDED) {
       return 1.0f;
     } else {
       int idx = 0;
-      float [] subProgresses = new float[subqueries.size()];
+      List<SubQuery> tempSubQueries = new ArrayList<SubQuery>();
+      synchronized(subqueries) {
+        tempSubQueries.addAll(subqueries.values());
+      }
+      float [] subProgresses = new float[tempSubQueries.size()];
       boolean finished = true;
-      for (SubQuery subquery: subqueries.values()) {
+      for (SubQuery subquery: tempSubQueries) {
         if (subquery.getState() != SubQueryState.NEW) {
           subProgresses[idx] = subquery.getProgress();
           if (finished && subquery.getState() != SubQueryState.SUCCEEDED) {
@@ -239,8 +232,8 @@ public class Query implements EventHandler<QueryEvent> {
   public QueryId getId() {
     return this.id;
   }
-  
-  public SubQuery getSubQuery(SubQueryId id) {
+
+  public SubQuery getSubQuery(ExecutionBlockId id) {
     return this.subqueries.get(id);
   }
 
@@ -263,7 +256,7 @@ public class Query implements EventHandler<QueryEvent> {
     @Override
     public QueryState transition(Query query, QueryEvent queryEvent) {
       query.setStartTime();
-      query.context.setState(QueryState.QUERY_INIT);
+      //query.context.setState(QueryState.QUERY_INIT);
       return QueryState.QUERY_INIT;
     }
   }
@@ -277,7 +270,8 @@ public class Query implements EventHandler<QueryEvent> {
           query.sm);
       subQuery.setPriority(query.priority--);
       query.addSubQuery(subQuery);
-      LOG.info("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+      LOG.debug("Schedule unit plan: \n" + subQuery.getBlock().getPlan());
+
       subQuery.handle(new SubQueryEvent(subQuery.getId(),
           SubQueryEventType.SQ_INIT));
     }
@@ -301,13 +295,16 @@ public class Query implements EventHandler<QueryEvent> {
           query.addSubQuery(nextSubQuery);
           nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(),
               SubQueryEventType.SQ_INIT));
-          LOG.info("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
-          LOG.info("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+          LOG.info("Scheduling SubQuery:" + nextSubQuery.getId());
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority());
+            LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan());
+          }
           return query.checkQueryForCompleted();
 
         } else { // Finish a query
           if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
-            SubQuery subQuery = query.getSubQuery(castEvent.getSubQueryId());
+            SubQuery subQuery = query.getSubQuery(castEvent.getExecutionBlockId());
             TableDesc desc = new TableDescImpl(query.conf.getOutputTable(),
                 subQuery.getTableMeta(), query.context.getOutputPath());
             query.setResultDesc(desc);
@@ -319,7 +316,7 @@ public class Query implements EventHandler<QueryEvent> {
             query.eventHandler.handle(new QueryFinishEvent(query.getId()));
 
             if (query.context.isCreateTableQuery()) {
-              // TOOD move to QueryMasterManager
+              // TOOD move to QueryJobManager
               //query.context.getCatalog().addTable(desc);
             }
           }
@@ -363,7 +360,6 @@ public class Query implements EventHandler<QueryEvent> {
 
   public QueryState finished(QueryState finalState) {
     setFinishTime();
-    context.setState(finalState);
     return finalState;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
new file mode 100644
index 0000000..2e2870f
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.engine.planner.logical.LogicalRootNode;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.WorkerResourceManager;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.ProtoAsyncRpcClient;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class QueryInProgress extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName());
+
+  private QueryId queryId;
+
+  private AsyncDispatcher dispatcher;
+
+  private LogicalRootNode plan;
+
+  private AtomicBoolean querySubmitted = new AtomicBoolean(false);
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private QueryInfo queryInfo;
+
+  private final TajoMaster.MasterContext masterContext;
+
+  private ProtoAsyncRpcClient queryMasterRpc;
+
+  private TajoWorkerProtocol.TajoWorkerProtocolService queryMasterRpcClient;
+
+  public QueryInProgress(
+      TajoMaster.MasterContext masterContext,
+      QueryId queryId, String sql, LogicalRootNode plan) {
+    super(QueryInProgress.class.getName());
+    this.masterContext = masterContext;
+    this.queryId = queryId;
+    this.plan = plan;
+
+    queryInfo = new QueryInfo(queryId, sql);
+    queryInfo.setStartTime(System.currentTimeMillis());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    dispatcher = new AsyncDispatcher();
+    this.addService(dispatcher);
+
+    dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler());
+    super.init(conf);
+  }
+
+  @Override
+  public void stop() {
+    synchronized(stopped) {
+      if(stopped.get()) {
+        return;
+      }
+      stopped.set(true);
+    }
+    LOG.info("=========================================================");
+    LOG.info("Stop query:" + queryId);
+
+    masterContext.getResourceManager().stopQueryMaster(queryId);
+
+    boolean queryMasterStopped = false;
+    long startTime = System.currentTimeMillis();
+    while(true) {
+      try {
+        if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
+          LOG.info("====> " + queryId + " QueryMaster stopped");
+          queryMasterStopped = true;
+          break;
+        }
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        break;
+      }
+      if(System.currentTimeMillis() - startTime > 60 * 1000) {
+        LOG.warn("Failed to stop QueryMaster:" + queryId);
+        break;
+      }
+    }
+
+    if(queryMasterRpc != null) {
+      //TODO release to connection pool
+      queryMasterRpc.close();
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  public EventHandler getEventHandler() {
+    return dispatcher.getEventHandler();
+  }
+
+  public void startQueryMaster() {
+    try {
+      LOG.info("Initializing QueryInProgress for QueryID=" + queryId);
+      WorkerResourceManager resourceManager = masterContext.getResourceManager();
+      WorkerResource queryMasterResource = resourceManager.allocateQueryMaster(this);
+
+      if(queryMasterResource != null) {
+        queryInfo.setQueryMasterResource(queryMasterResource);
+      }
+      getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo));
+    } catch (Exception e) {
+      catchException(e);
+    }
+  }
+
+  class QueryInProgressEventHandler implements EventHandler<QueryJobEvent> {
+    @Override
+    public void handle(QueryJobEvent queryJobEvent) {
+      if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) {
+        heartbeat(queryJobEvent.getQueryInfo());
+      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) {
+        masterContext.getResourceManager().startQueryMaster(QueryInProgress.this);
+      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) {
+        submmitQueryToMaster();
+      } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) {
+        stop();
+      }
+    }
+  }
+
+  public TajoWorkerProtocol.TajoWorkerProtocolService getQueryMasterRpcClient() {
+    return queryMasterRpcClient;
+  }
+
+  private void connectQueryMaster() throws Exception {
+    if(queryInfo.getQueryMasterResource() != null &&
+        queryInfo.getQueryMasterResource().getAllocatedHost() != null) {
+      InetSocketAddress addr = NetUtils.createSocketAddr(
+          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
+      LOG.info("Connect to QueryMaster:" + addr);
+      //TODO Get Connection from pool
+      queryMasterRpc = new ProtoAsyncRpcClient(TajoWorkerProtocol.class, addr);
+      queryMasterRpcClient = queryMasterRpc.getStub();
+    }
+  }
+
+  private synchronized void submmitQueryToMaster() {
+    if(querySubmitted.get()) {
+      return;
+    }
+
+    try {
+      if(queryMasterRpcClient == null) {
+        connectQueryMaster();
+      }
+      if(queryMasterRpcClient == null) {
+        LOG.info("No QueryMaster conneciton info.");
+        //TODO wait
+        return;
+      }
+      LOG.info("====>Call executeQuery to :" +
+          queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort() + "," + queryId);
+      queryMasterRpcClient.executeQuery(
+          null,
+          TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
+              .setQueryId(queryId.getProto())
+              .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
+              .build(), NullCallback.get());
+      querySubmitted.set(true);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public void catchException(Exception e) {
+    LOG.error(e.getMessage(), e);
+    queryInfo.setQueryState(TajoProtos.QueryState.QUERY_FAILED);
+    queryInfo.setLastMessage(StringUtils.stringifyException(e));
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public QueryInfo getQueryInfo() {
+    return this.queryInfo;
+  }
+
+  private void heartbeat(QueryInfo queryInfo) {
+    LOG.info("Received QueryMaster heartbeat:" + queryInfo);
+    if(queryInfo.getQueryMasterResource() != null) {
+      this.queryInfo.setQueryMasterResource(queryInfo.getQueryMasterResource());
+    }
+    this.queryInfo.setQueryState(queryInfo.getQueryState());
+
+    if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) {
+      this.queryInfo.setLastMessage(queryInfo.getLastMessage());
+      LOG.info(queryId + queryInfo.getLastMessage());
+    }
+    if(this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) {
+      //TODO needed QueryMaster's detail status(failed before or after launching worker)
+      //queryMasterStopped.set(true);
+      LOG.warn(queryId + " failed, " + queryInfo.getLastMessage());
+    }
+
+    if(!querySubmitted.get()) {
+      getEventHandler().handle(
+          new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, this.queryInfo));
+    }
+
+    if(isFinishState(this.queryInfo.getQueryState())) {
+      getEventHandler().handle(
+          new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_FINISH, this.queryInfo));
+    }
+  }
+
+  private boolean isFinishState(TajoProtos.QueryState state) {
+    return state == TajoProtos.QueryState.QUERY_FAILED ||
+        state == TajoProtos.QueryState.QUERY_KILLED ||
+        state == TajoProtos.QueryState.QUERY_SUCCEEDED;
+  }
+
+//  private void checkQueryMasterShutdown() {
+//    //run background
+//    Thread t = new Thread() {
+//      public void run() {
+//        while(true) {
+//          try {
+//            if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
+//              queryMasterStopped.set(true);
+//              LOG.info("==========> " + queryId + " QueryMaster stopped");
+//              break;
+//            }
+//          } catch (Exception e) {
+//            LOG.error(e.getMessage(), e);
+//          }
+//          try {
+//            Thread.sleep(1000);
+//          } catch (InterruptedException e) {
+//            break;
+//          }
+//        }
+//      }
+//    };
+//
+//    t.start();
+//  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
new file mode 100644
index 0000000..e7ceae7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.querymaster;
+
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.master.rm.WorkerResource;
+
+public class QueryInfo {
+  private QueryId queryId;
+  private String sql;
+  private TajoProtos.QueryState queryState;
+  private float progress;
+  private long startTime;
+  private long finishTime;
+  private String lastMessage;
+  private WorkerResource queryMasterResource;
+
+  public QueryInfo(QueryId queryId) {
+    this(queryId, null);
+  }
+
+  public QueryInfo(QueryId queryId, String sql) {
+    this.queryId = queryId;
+    this.sql = sql;
+    this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public String getSql() {
+    return sql;
+  }
+
+  public String getQueryMasterHost() {
+    if(queryMasterResource == null) {
+      return null;
+    }
+    return queryMasterResource.getAllocatedHost();
+  }
+
+  public void setQueryMasterResource(WorkerResource queryMasterResource) {
+    this.queryMasterResource = queryMasterResource;
+  }
+
+  public int getQueryMasterPort() {
+    if(queryMasterResource == null) {
+      return 0;
+    }
+    return queryMasterResource.getPorts()[0];
+  }
+
+  public int getQueryMasterClientPort() {
+    if(queryMasterResource == null) {
+      return 0;
+    }
+    return queryMasterResource.getPorts()[1];
+  }
+
+  public TajoProtos.QueryState getQueryState() {
+    return queryState;
+  }
+
+  public void setQueryState(TajoProtos.QueryState queryState) {
+    this.queryState = queryState;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  public void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  public String getLastMessage() {
+    return lastMessage;
+  }
+
+  public void setLastMessage(String lastMessage) {
+    this.lastMessage = lastMessage;
+  }
+
+  public WorkerResource getQueryMasterResource() {
+    return queryMasterResource;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  @Override
+  public String toString() {
+    return queryId.toString() + ", queryMaster=" + queryMasterResource;
+  }
+}


Mime
View raw message