tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [3/8] TAJO-127: Implement Tajo Resource Manager. (hyoungjunkim via hyunsik)
Date Mon, 26 Aug 2013 12:29:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
new file mode 100644
index 0000000..8756eed
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master.rm;
+
+import com.google.protobuf.RpcCallback;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.client.YarnClient;
+import org.apache.hadoop.yarn.client.YarnClientImpl;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.TajoMaster;
+import org.apache.tajo.master.YarnContainerProxy;
+import org.apache.tajo.master.querymaster.QueryInProgress;
+import org.apache.tajo.master.querymaster.QueryJobEvent;
+import org.apache.tajo.util.ApplicationIdUtils;
+import org.apache.tajo.worker.TajoWorker;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
+import java.util.*;
+
+public class YarnTajoResourceManager implements WorkerResourceManager {
+  private static final Log LOG = LogFactory.getLog(YarnTajoResourceManager.class);
+
+  private YarnClient yarnClient;
+  private AMRMProtocol rmClient;
+  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private Configuration conf;
+  private TajoMaster.MasterContext masterContext;
+
+  public YarnTajoResourceManager() {
+
+  }
+
+  public YarnTajoResourceManager(TajoMaster.MasterContext masterContext) {
+    this.masterContext = masterContext;
+  }
+
+  @Override
+  public void stop() {
+  }
+
+  @Override
+  public void workerHeartbeat(TajoMasterProtocol.TajoHeartbeat request) {
+    //nothing to do
+    //yarn manages worker membership.
+  }
+
+  @Override
+  public void releaseWorkerResource(QueryId queryId, WorkerResource workerResource) {
+    //nothing to do
+  }
+
+  @Override
+  public WorkerResource allocateQueryMaster(QueryInProgress queryInProgress) {
+    //nothing to do
+    //allocateAndLaunchQueryMaster in startQueryMaster()
+    return null;
+  }
+
+  @Override
+  public void allocateWorkerResources(
+      TajoMasterProtocol.WorkerResourceAllocationRequest request,
+      RpcCallback<TajoMasterProtocol.WorkerResourceAllocationResponse> rpcCallBack) {
+    //nothing to do
+  }
+
+  @Override
+  public void startQueryMaster(QueryInProgress queryInProgress) {
+    try {
+      allocateAndLaunchQueryMaster(queryInProgress);
+
+      queryInProgress.getEventHandler().handle(
+          new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo()));
+
+    } catch (YarnRemoteException e) {
+      LOG.error(e.getMessage(), e);
+      //TODO set QueryState(fail)
+    }
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.conf = conf;
+    connectYarnClient();
+
+    final YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress rmAddress = conf.getSocketAddr(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    rmClient = currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, yarnConf);
+      }
+    });
+  }
+
+  @Override
+  public String getSeedQueryId() throws IOException {
+    try {
+      GetNewApplicationResponse newApp = yarnClient.getNewApplication();
+      ApplicationId appId = newApp.getApplicationId();
+
+      return appId.toString();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void stopQueryMaster(QueryId queryId) {
+    try {
+      FinalApplicationStatus appStatus = FinalApplicationStatus.UNDEFINED;
+      QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId);
+      if(queryInProgress != null) {
+        return;
+      }
+      TajoProtos.QueryState state = queryInProgress.getQueryInfo().getQueryState();
+      if (state == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+        appStatus = FinalApplicationStatus.SUCCEEDED;
+      } else if (state == TajoProtos.QueryState.QUERY_FAILED || state == TajoProtos.QueryState.QUERY_ERROR) {
+        appStatus = FinalApplicationStatus.FAILED;
+      } else if (state == TajoProtos.QueryState.QUERY_ERROR) {
+        appStatus = FinalApplicationStatus.FAILED;
+      }
+      FinishApplicationMasterRequest request = recordFactory
+          .newRecordInstance(FinishApplicationMasterRequest.class);
+      request.setAppAttemptId(ApplicationIdUtils.createApplicationAttemptId(queryId));
+      request.setFinishApplicationStatus(appStatus);
+      request.setDiagnostics("QueryMaster shutdown by TajoMaster.");
+      rmClient.finishApplicationMaster(request);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  private void connectYarnClient() {
+    this.yarnClient = new YarnClientImpl();
+    this.yarnClient.init(conf);
+    this.yarnClient.start();
+  }
+
+  private ApplicationAttemptId allocateAndLaunchQueryMaster(QueryInProgress queryInProgress)
+      throws YarnRemoteException {
+    QueryId queryId = queryInProgress.getQueryId();
+    ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
+
+    LOG.info("Allocate and launch ApplicationMaster for QueryMaster: queryId=" +
+        queryId + ", appId=" + appId);
+
+    ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+    // set the application id
+    appContext.setApplicationId(appId);
+    // set the application name
+    appContext.setApplicationName("Tajo");
+
+    Priority pri = Records.newRecord(Priority.class);
+    pri.setPriority(5);
+    appContext.setPriority(pri);
+
+    // Set the queue to which this application is to be submitted in the RM
+    appContext.setQueue("default");
+
+    ContainerLaunchContext commonContainerLaunchContext =
+        YarnContainerProxy.createCommonContainerLaunchContext(masterContext.getConf(), queryId.toString(), true);
+
+    // Setup environment by cloning from common env.
+    Map<String, String> env = commonContainerLaunchContext.getEnvironment();
+    Map<String, String> myEnv = new HashMap<String, String>(env.size());
+    myEnv.putAll(env);
+
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the local resources
+    ////////////////////////////////////////////////////////////////////////////
+    // Set the necessary command to execute the application master
+    Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+    // Set java executable command
+    //LOG.info("Setting up app master command");
+    vargs.add("${JAVA_HOME}" + "/bin/java");
+    // Set Xmx based on am memory size
+    String jvmOptions = masterContext.getConf().get("tajo.rm.yarn.querymaster.jvm.option", "-Xmx2000m");
+
+    for(String eachToken: jvmOptions.split((" "))) {
+      vargs.add(eachToken);
+    }
+    // Set Remote Debugging
+    //if (!context.getQuery().getSubQuery(event.getExecutionBlockId()).isLeafQuery()) {
+    //vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
+    //}
+    // Set class name
+    //vargs.add(QueryMasterRunner.class.getCanonicalName());
+    vargs.add(TajoWorker.class.getCanonicalName());
+    vargs.add("qm");
+    vargs.add(queryId.toString()); // queryId
+    vargs.add(masterContext.getTajoMasterService().getBindAddress().getHostName() + ":" +
+        masterContext.getTajoMasterService().getBindAddress().getPort());
+
+    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+    // Get final commmand
+    StringBuilder command = new StringBuilder();
+    for (CharSequence str : vargs) {
+      command.append(str).append(" ");
+    }
+
+    LOG.info("Completed setting up QueryMasterRunner command " + command.toString());
+    List<String> commands = new ArrayList<String>();
+    commands.add(command.toString());
+
+    final Resource resource = Records.newRecord(Resource.class);
+    // TODO - get default value from conf
+    resource.setMemory(256);
+    resource.setVirtualCores(1);
+
+    Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+
+    ContainerLaunchContext masterContainerContext = BuilderUtils.newContainerLaunchContext(
+        null, commonContainerLaunchContext.getUser(),
+        resource, commonContainerLaunchContext.getLocalResources(), myEnv, commands,
+        myServiceData, null, new HashMap<ApplicationAccessType, String>(2));
+
+    appContext.setAMContainerSpec(masterContainerContext);
+
+    LOG.info("Submitting QueryMaster to ResourceManager");
+    yarnClient.submitApplication(appContext);
+
+    ApplicationReport appReport = monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED));
+    ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId();
+
+    LOG.info("Launching QueryMaster with appAttemptId: " + attemptId);
+
+    return attemptId;
+  }
+
+  private ApplicationReport monitorApplication(ApplicationId appId,
+                                               Set<YarnApplicationState> finalState) throws YarnRemoteException {
+
+    long sleepTime = 100;
+    int count = 1;
+    while (true) {
+      // Get application report for the appId we are interested in
+      ApplicationReport report = yarnClient.getApplicationReport(appId);
+
+      LOG.info("Got application report from ASM for" + ", appId="
+          + appId.getId() + ", appAttemptId="
+          + report.getCurrentApplicationAttemptId() + ", clientToken="
+          + report.getClientToken() + ", appDiagnostics="
+          + report.getDiagnostics() + ", appMasterHost=" + report.getHost()
+          + ", appQueue=" + report.getQueue() + ", appMasterRpcPort="
+          + report.getRpcPort() + ", appStartTime=" + report.getStartTime()
+          + ", yarnAppState=" + report.getYarnApplicationState().toString()
+          + ", distributedFinalState="
+          + report.getFinalApplicationStatus().toString() + ", appTrackingUrl="
+          + report.getTrackingUrl() + ", appUser=" + report.getUser());
+
+      YarnApplicationState state = report.getYarnApplicationState();
+      if (finalState.contains(state)) {
+        return report;
+      }
+      try {
+        Thread.sleep(sleepTime);
+        sleepTime = count * 100;
+        if(count < 10) {
+          count++;
+        }
+      } catch (InterruptedException e) {
+        //LOG.debug("Thread sleep in monitoring loop interrupted");
+      }
+    }
+  }
+
+  public boolean isQueryMasterStopped(QueryId queryId) {
+    ApplicationId appId = ApplicationIdUtils.queryIdToAppId(queryId);
+    try {
+      ApplicationReport report = yarnClient.getApplicationReport(appId);
+      YarnApplicationState state = report.getYarnApplicationState();
+      return EnumSet.of(
+          YarnApplicationState.FINISHED,
+          YarnApplicationState.KILLED,
+          YarnApplicationState.FAILED).contains(state);
+    } catch (YarnRemoteException e) {
+      LOG.error(e.getMessage(), e);
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
new file mode 100644
index 0000000..5fe1b74
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/util/ApplicationIdUtils.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.util;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+
+public class ApplicationIdUtils {
+
+  public static ApplicationAttemptId createApplicationAttemptId(QueryId queryId) {
+    return BuilderUtils.newApplicationAttemptId(queryIdToAppId(queryId), 1);
+  }
+
+  public static ApplicationId queryIdToAppId(QueryId queryId) {
+    return BuilderUtils.newApplicationId(Long.parseLong(queryId.getId()), queryId.getSeq());
+  }
+
+  public static QueryId appIdToQueryId(YarnProtos.ApplicationIdProto appId) {
+    return QueryIdFactory.newQueryId(appId.getClusterTimestamp(), appId.getId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
index f185cc6..b057561 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/webapp/StaticHttpServer.java
@@ -19,27 +19,24 @@
 package org.apache.tajo.webapp;
 
 import org.apache.hadoop.conf.Configuration;
-import org.mortbay.jetty.Connector;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.master.TajoMaster;
+import org.mortbay.jetty.Connector;
 
 import java.io.IOException;
 
 public class StaticHttpServer extends HttpServer {
   private static StaticHttpServer instance = null;
-  private TajoMaster master = null;
-  
-  private StaticHttpServer(TajoMaster master , String name, String bindAddress, int port,
+
+  private StaticHttpServer(Object containerObject , String name, String bindAddress, int port,
       boolean findPort, Connector connector, Configuration conf,
       String[] pathSpecs) throws IOException {
     super( name, bindAddress, port, findPort, connector, conf, pathSpecs);
-    this.master = master;
   }
   public static StaticHttpServer getInstance() {
     return instance;
   }
-  public static StaticHttpServer getInstance( TajoMaster master, String name,
+  public static StaticHttpServer getInstance(Object containerObject, String name,
       String bindAddress, int port, boolean findPort, Connector connector,
       TajoConf conf,
       String[] pathSpecs) throws IOException {
@@ -49,19 +46,16 @@ public class StaticHttpServer extends HttpServer {
         addr = conf.getVar(ConfVars.TASKRUNNER_LISTENER_ADDRESS).split(":")[0];
       }
       
-      instance = new StaticHttpServer(master, name, addr, port,
+      instance = new StaticHttpServer(containerObject, name, addr, port,
           findPort, connector, conf, pathSpecs);
-      instance.setAttribute("tajo.master", master);
-      instance.setAttribute("tajo.master.addr", addr);
-      instance.setAttribute("tajo.master.conf", conf);
-      instance.setAttribute("tajo.master.starttime", System.currentTimeMillis());
+      instance.setAttribute("tajo.info.server.object", containerObject);
+      instance.setAttribute("tajo.info.server.addr", addr);
+      instance.setAttribute("tajo.info.server.conf", conf);
+      instance.setAttribute("tajo.info.server.starttime", System.currentTimeMillis());
     }
     return instance;
   }
-  public TajoMaster getMaster() {
-    
-    return this.master;
-  }
+
   public void set(String name, Object object) {
     instance.setAttribute(name, object);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
new file mode 100644
index 0000000..ebc3f08
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.master.ContainerProxy;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractResourceAllocator extends CompositeService implements ResourceAllocator {
+  private int minCapability;
+  private int maxCapability;
+  private int numCluster;
+
+  private Map<ContainerId, ContainerProxy> containers = new HashMap<ContainerId, ContainerProxy>();
+
+  public AbstractResourceAllocator() {
+    super(AbstractResourceAllocator.class.getName());
+  }
+
+  public void addContainer(ContainerId cId, ContainerProxy container) {
+    containers.put(cId, container);
+  }
+
+  public void removeContainer(ContainerId cId) {
+    containers.remove(cId);
+  }
+
+  public boolean containsContainer(ContainerId cId) {
+    return containers.containsKey(cId);
+  }
+
+  public ContainerProxy getContainer(ContainerId cId) {
+    return containers.get(cId);
+  }
+
+  public Map<ContainerId, ContainerProxy> getContainers() {
+    return containers;
+  }
+
+  public void setMaxContainerCapability(int capability) {
+    this.maxCapability = capability;
+  }
+
+  public int getMaxContainerCapability() {
+    return this.maxCapability;
+  }
+
+  public void setMinContainerCapability(int capability) {
+    this.minCapability = capability;
+  }
+
+  public int getNumClusterNode() {
+    return numCluster;
+  }
+
+  public void setNumClusterNodes(int num) {
+    numCluster = num;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
new file mode 100644
index 0000000..108c7b7
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+
+public interface ResourceAllocator {
+  public void allocateTaskWorker();
+  public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
new file mode 100644
index 0000000..3264f55
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryConf;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.ContainerProxy;
+import org.apache.tajo.master.TajoContainerProxy;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.TaskRunnerLauncher;
+import org.apache.tajo.master.event.ContainerAllocationEvent;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.querymaster.SubQuery;
+import org.apache.tajo.master.querymaster.SubQueryState;
+import org.apache.tajo.master.rm.TajoWorkerContainer;
+import org.apache.tajo.master.rm.TajoWorkerContainerId;
+import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.rpc.CallFuture2;
+import org.apache.tajo.util.ApplicationIdUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TajoResourceAllocator extends AbstractResourceAllocator {
+  private static final Log LOG = LogFactory.getLog(TajoResourceAllocator.class);
+
+  static AtomicInteger containerIdSeq = new AtomicInteger(0);
+  private TajoConf tajoConf;
+  private QueryMasterTask.QueryContext queryContext;
+  private final ExecutorService executorService;
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  public TajoResourceAllocator(QueryMasterTask.QueryContext queryContext) {
+    this.queryContext = queryContext;
+    executorService = Executors.newFixedThreadPool(
+        queryContext.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+  }
+
+  @Override
+  public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerIdProto) {
+    TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+    ApplicationAttemptId appAttemptId = new ApplicationAttemptIdPBImpl(containerIdProto.getAppAttemptId());
+    containerId.setApplicationAttemptId(appAttemptId);
+    containerId.setId(containerIdProto.getId());
+    return containerId;
+  }
+
+  @Override
+  public void allocateTaskWorker() {
+    //To change body of implemented methods use File | Settings | File Templates.
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    tajoConf = (TajoConf)conf;
+
+    queryContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
+//
+    queryContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
+
+    super.init(conf);
+  }
+
+  @Override
+  public void stop() {
+    if(stopped.get()) {
+      return;
+    }
+    stopped.set(true);
+    executorService.shutdownNow();
+
+    while(!executorService.isTerminated()) {
+      LOG.info("====>executorService.isTerminated:" + executorService.isTerminated() + "," +
+          executorService.isShutdown());
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+    }
+    Map<ContainerId, ContainerProxy> containers = queryContext.getResourceAllocator().getContainers();
+    for(ContainerProxy eachProxy: containers.values()) {
+      try {
+        eachProxy.stopContainer();
+      } catch (Exception e) {
+      }
+    }
+    super.stop();
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  final public static FsPermission QUERYCONF_FILE_PERMISSION =
+      FsPermission.createImmutable((short) 0644); // rw-r--r--
+
+  private static void writeConf(Configuration conf, Path queryConfFile)
+      throws IOException {
+    // Write job file to Tajo's fs
+    FileSystem fs = queryConfFile.getFileSystem(conf);
+    FSDataOutputStream out =
+        FileSystem.create(fs, queryConfFile,
+            new FsPermission(QUERYCONF_FILE_PERMISSION));
+    try {
+      conf.writeXml(out);
+    } finally {
+      out.close();
+    }
+  }
+
+  class TajoTaskRunnerLauncher implements TaskRunnerLauncher {
+    @Override
+    public void handle(TaskRunnerGroupEvent event) {
+      if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) {
+        launchTaskRunners(event.getExecutionBlockId(), event.getContainers());
+      } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) {
+        stopContainers(event.getContainers());
+      }
+    }
+  }
+
+  private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
+    FileSystem fs = null;
+
+    QueryConf queryConf = queryContext.getConf();
+    LOG.info("defaultFS: " + queryConf.get("fs.default.name"));
+    LOG.info("defaultFS: " + queryConf.get("fs.defaultFS"));
+    try {
+      fs = FileSystem.get(queryConf);
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+
+    try {
+      // TODO move to tajo temp
+      Path warehousePath = new Path(queryConf.getVar(TajoConf.ConfVars.ROOT_DIR), TajoConstants.WAREHOUSE_DIR);
+      Path queryConfPath = new Path(warehousePath, executionBlockId.getQueryId().toString());
+      queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
+
+      if(!fs.exists(queryConfPath)){
+        LOG.info("Writing a QueryConf to HDFS and add to local environment, outputPath=" + queryConf.getOutputPath());
+        writeConf(queryConf, queryConfPath);
+      } else {
+        LOG.warn("QueryConf already exist. path: "  + queryConfPath.toString());
+      }
+    } catch (IOException e) {
+      LOG.error(e.getMessage(), e);
+    }
+    //Query in standby mode doesn't need launch Worker.
+    //But, Assign ExecutionBlock to assigned tajo worker
+    for(Container eachContainer: containers) {
+      TajoContainerProxy containerProxy = new TajoContainerProxy(queryContext, tajoConf,
+          eachContainer, executionBlockId);
+      executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
+    }
+  }
+
+  protected class LaunchRunner implements Runnable {
+    private final ContainerProxy proxy;
+    private final ContainerId id;
+    public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+      this.proxy = proxy;
+      this.id = id;
+    }
+    @Override
+    public void run() {
+      proxy.launch(null);
+      LOG.info("ContainerProxy started:" + id);
+    }
+  }
+
+  private void stopContainers(Collection<Container> containers) {
+    for (Container container : containers) {
+      final ContainerProxy proxy = queryContext.getResourceAllocator().getContainer(container.getId());
+      executorService.submit(new StopContainerRunner(container.getId(), proxy));
+    }
+  }
+
+  private class StopContainerRunner implements Runnable {
+    private final ContainerProxy proxy;
+    private final ContainerId id;
+    public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+      this.id = id;
+      this.proxy = proxy;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("ContainerProxy stopped:" + id + "," + proxy.getId());
+      proxy.stopContainer();
+    }
+  }
+
+  class TajoWorkerAllocationHandler implements EventHandler<ContainerAllocationEvent> {
+    @Override
+    public void handle(ContainerAllocationEvent event) {
+      executorService.submit(new TajoWorkerAllocationThread(event));
+    }
+  }
+
+  class TajoWorkerAllocationThread extends Thread {
+    ContainerAllocationEvent event;
+    TajoWorkerAllocationThread(ContainerAllocationEvent event) {
+      this.event = event;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("======> Start TajoWorkerAllocationThread");
+      CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
+          new CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+
+      int requiredMemoryMBSlot = 512;  //TODO
+      int requiredDiskSlots = 1;  //TODO
+      TajoMasterProtocol.WorkerResourceAllocationRequest request =
+          TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+              .setMemoryMBSlots(requiredMemoryMBSlot)
+              .setDiskSlots(requiredDiskSlots)
+              .setNumWorks(event.getRequiredNum())
+              .setExecutionBlockId(event.getExecutionBlockId().getProto())
+              .build();
+
+      queryContext.getQueryMasterContext().getWorkerContext().
+          getTajoMasterRpcClient().allocateWorkerResources(null, request, callBack);
+
+      int numAllocatedWorkers = 0;
+      while(!stopped.get()) {
+        TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
+        try {
+          response = callBack.get(3, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+          if(stopped.get()) {
+            break;
+          }
+        } catch (TimeoutException e) {
+          LOG.info("No available worker resource for " + event.getExecutionBlockId());
+          continue;
+        }
+
+        List<String> workerHosts = response.getAllocatedWorksList();
+        ExecutionBlockId executionBlockId = event.getExecutionBlockId();
+
+        List<Container> containers = new ArrayList<Container>();
+        for(String eachWorker: workerHosts) {
+          TajoWorkerContainer container = new TajoWorkerContainer();
+          NodeIdPBImpl nodeId = new NodeIdPBImpl();
+          String[] tokens = eachWorker.split(":");
+
+          nodeId.setHost(tokens[0]);
+          nodeId.setPort(Integer.parseInt(tokens[1]));
+
+          TajoWorkerContainerId containerId = new TajoWorkerContainerId();
+
+          containerId.setApplicationAttemptId(
+              ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId()));
+          containerId.setId(containerIdSeq.incrementAndGet());
+
+          container.setId(containerId);
+          container.setNodeId(nodeId);
+
+          WorkerResource workerResource = new WorkerResource();
+          workerResource.setAllocatedHost(nodeId.getHost());
+          workerResource.setPorts(new int[]{nodeId.getPort()});
+          workerResource.setMemoryMBSlots(requiredMemoryMBSlot);
+          workerResource.setDiskSlots(requiredDiskSlots);
+
+          container.setWorkerResource(workerResource);
+
+          containers.add(container);
+        }
+
+        SubQueryState state = queryContext.getSubQuery(executionBlockId).getState();
+        if (!SubQuery.isRunningState(state)) {
+          List<WorkerResource> workerResources = new ArrayList<WorkerResource>();
+          for(Container eachContainer: containers) {
+            workerResources.add(((TajoWorkerContainer)eachContainer).getWorkerResource());
+          }
+          try {
+            TajoContainerProxy.releaseWorkerResource(queryContext, executionBlockId, workerResources);
+          } catch (Exception e) {
+            LOG.error(e.getMessage(), e);
+          }
+          return;
+        }
+
+        if (workerHosts.size() > 0) {
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
+          }
+          queryContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
+        }
+        numAllocatedWorkers += workerHosts.size();
+        if(numAllocatedWorkers >= event.getRequiredNum()) {
+          break;
+        }
+      }
+      LOG.info("======> Stop TajoWorkerAllocationThread");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
new file mode 100644
index 0000000..3b116f2
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.master.querymaster.QueryMaster;
+import org.apache.tajo.master.rm.TajoWorkerResourceManager;
+import org.apache.tajo.pullserver.TajoPullServerService;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.ProtoAsyncRpcClient;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.webapp.StaticHttpServer;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TajoWorker extends CompositeService {
+  public static PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+  public static PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
+  private static final Log LOG = LogFactory.getLog(TajoWorker.class);
+
+  private TajoConf tajoConf;
+
+  private StaticHttpServer webServer;
+
+  private TajoWorkerClientService tajoWorkerClientService;
+
+  private TajoWorkerManagerService tajoWorkerManagerService;
+
+  //to TajoMaster
+  private ProtoAsyncRpcClient tajoMasterRpc;
+
+  private TajoMasterProtocol.TajoMasterProtocolService tajoMasterRpcClient;
+
+  private WorkerContext workerContext;
+
+  private TaskRunnerManager taskRunnerManager;
+
+  private TajoPullServerService pullService;
+
+  private String daemonMode;
+
+  private WorkerHeartbeatThread workerHeartbeatThread;
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  public TajoWorker(String daemonMode) throws Exception {
+    super(TajoWorker.class.getName());
+    this.daemonMode = daemonMode;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
+
+    this.tajoConf = (TajoConf)conf;
+    RackResolver.init(tajoConf);
+
+    workerContext = new WorkerContext();
+
+    String resourceManagerClassName = conf.get("tajo.resource.manager",
+        TajoWorkerResourceManager.class.getCanonicalName());
+
+    boolean randomPort = true;
+    if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
+      randomPort = false;
+    }
+    int infoPort = tajoConf.getInt("tajo.worker.info.port", 8090);
+    int clientPort = tajoConf.getInt("tajo.worker.client.rpc.port", 8091);
+    int managerPort = tajoConf.getInt("tajo.worker.manager.rpc.port", 8092);
+
+    if(randomPort) {
+      clientPort = 0;
+      managerPort = 0;
+      tajoConf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname, 0);
+      //infoPort = 0;
+    }
+    try {
+      //TODO WebServer port configurable
+      webServer = StaticHttpServer.getInstance(this, "admin", null, infoPort,
+          true, null, tajoConf, null);
+      webServer.start();
+    } catch (Exception e) {
+      LOG.error("Can' start info http server:" + e.getMessage(), e);
+    }
+
+    if(!"qm".equals(daemonMode)) {
+      taskRunnerManager = new TaskRunnerManager(workerContext);
+      addService(taskRunnerManager);
+    }
+
+    if(workerContext.isStandbyMode()) {
+      pullService = new TajoPullServerService();
+      addService(pullService);
+    }
+
+    if(!"tr".equals(daemonMode)) {
+      tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
+      addService(tajoWorkerClientService);
+
+      tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, managerPort);
+      addService(tajoWorkerManagerService);
+      LOG.info("====> Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort=" + managerPort);
+    } else {
+      LOG.info("====> Tajo worker started: mode=" + daemonMode);
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(stopped.get()) {
+      return;
+    }
+    stopped.set(true);
+    if(webServer != null) {
+      try {
+        webServer.stop();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+    }
+    if(workerHeartbeatThread != null) {
+      workerHeartbeatThread.interrupt();
+    }
+
+//    try {
+//      FileSystem.closeAll();
+//    } catch (IOException e) {
+//      LOG.error(e.getMessage(), e);
+//    }
+    if(tajoMasterRpc != null) {
+      tajoMasterRpc.close();
+    }
+//    for(Service eachService: getServices()) {
+//      System.out.println("Service:" + eachService);
+//    }
+    super.stop();
+    LOG.info("TajoWorker main thread exiting");
+  }
+
+  public class WorkerContext {
+    public QueryMaster getQueryMaster() {
+      return tajoWorkerManagerService.getQueryMaster();
+    }
+
+    public TajoWorkerManagerService getTajoWorkerManagerService() {
+      return tajoWorkerManagerService;
+    }
+
+    public TajoWorkerClientService getTajoWorkerClientService() {
+      return tajoWorkerClientService;
+    }
+
+    public TajoMasterProtocol.TajoMasterProtocolService getTajoMasterRpcClient() {
+      return tajoMasterRpcClient;
+    }
+
+    public TaskRunnerManager getTaskRunnerManager() {
+      return taskRunnerManager;
+    }
+
+    public TajoPullServerService getPullService() {
+      return pullService;
+    }
+
+    public void stopWorker(boolean force) {
+      stop();
+      if(force) {
+        System.exit(0);
+      }
+    }
+
+    public boolean isStandbyMode() {
+      return !"qm".equals(daemonMode) && !"tr".equals(daemonMode);
+    }
+  }
+
+  public void stopWorkerForce() {
+    stop();
+  }
+
+  private void setWorkerMode(String[] params) {
+    if("qm".equals(daemonMode)) {
+      //QueryMaster mode
+      String tajoMasterAddress = params[2];
+
+      LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
+      InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
+      try {
+        tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, addr);
+        tajoMasterRpcClient = tajoMasterRpc.getStub();
+      } catch (Exception e) {
+        LOG.error("Can't connect to TajoMaster[" + addr + "], " + e.getMessage(), e);
+      }
+
+      QueryId queryId = TajoIdUtils.parseQueryId(params[1]);
+      tajoWorkerManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
+          queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
+    } else if("tr".equals(daemonMode)) {
+      //TaskRunner mode
+      taskRunnerManager.startTask(params);
+    } else {
+      //Standby mode
+      String tajoMasterAddress = tajoConf.get("tajo.master.manager.addr");
+      LOG.info("Init TajoMaster connection to:" + tajoMasterAddress);
+      InetSocketAddress addr = NetUtils.createSocketAddr(tajoMasterAddress);
+      try {
+        tajoMasterRpc = new ProtoAsyncRpcClient(TajoMasterProtocol.class, addr);
+        tajoMasterRpcClient = tajoMasterRpc.getStub();
+      } catch (Exception e) {
+        LOG.error("Can't connect to TajoMaster[" + addr + "], " + e.getMessage(), e);
+      }
+      workerHeartbeatThread = new WorkerHeartbeatThread();
+      workerHeartbeatThread.start();
+    }
+  }
+
+  class WorkerHeartbeatThread extends Thread {
+    TajoMasterProtocol.ServerStatusProto.System systemInfo;
+    List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
+        new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+    int workerDiskSlots;
+    List<File> mountPaths;
+
+    public WorkerHeartbeatThread() {
+      int workerMemoryMBSlots;
+      int workerCpuCoreSlots;
+
+      boolean useSystemInfo = tajoConf.getBoolean("tajo.worker.slots.use.os.info", true);
+
+      try {
+        mountPaths = getMountPath();
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+      }
+
+      if(useSystemInfo) {
+        float memoryRatio = tajoConf.getFloat("tajo.worker.slots.os.memory.ratio", 0.8f);
+        workerMemoryMBSlots = getTotalMemoryMB();
+        workerMemoryMBSlots = (int)((float)(workerMemoryMBSlots) * memoryRatio);
+        workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
+        if(mountPaths == null) {
+          workerDiskSlots = 2;
+        } else {
+          workerDiskSlots = mountPaths.size();
+        }
+      } else {
+        workerMemoryMBSlots = tajoConf.getInt("tajo.worker.slots.memoryMB", 2048);
+        workerDiskSlots = tajoConf.getInt("tajo.worker.slots.disk", 2);
+        workerCpuCoreSlots = tajoConf.getInt("tajo.worker.slots.cpu.core", 4);
+      }
+
+      workerDiskSlots = workerDiskSlots * tajoConf.getInt("tajo.worker.slots.disk.concurrency", 4);
+
+      systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+          .setAvailableProcessors(workerCpuCoreSlots)
+          .setFreeMemoryMB(0)
+          .setMaxMemoryMB(0)
+          .setTotalMemoryMB(workerMemoryMBSlots)
+          .build();
+    }
+
+    public void run() {
+      LOG.info("Worker Resource Heartbeat Thread start.");
+      int sendDiskInfoCount = 0;
+      while(true) {
+        if(sendDiskInfoCount == 0 && mountPaths != null) {
+          for(File eachFile: mountPaths) {
+            diskInfos.clear();
+            diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+                .setAbsolutePath(eachFile.getAbsolutePath())
+                .setTotalSpace(eachFile.getTotalSpace())
+                .setFreeSpace(eachFile.getFreeSpace())
+                .setUsableSpace(eachFile.getUsableSpace())
+                .build());
+          }
+        }
+        TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+            .addAllDisk(diskInfos)
+            .setRunningTaskNum(0)   //TODO
+            .setSystem(systemInfo)
+            .setDiskSlots(workerDiskSlots)
+            .build();
+
+        TajoMasterProtocol.TajoHeartbeat heartbeatProto = TajoMasterProtocol.TajoHeartbeat.newBuilder()
+            .setTajoWorkerHost(workerContext.getTajoWorkerManagerService().getBindAddr().getHostName())
+            .setTajoWorkerPort(workerContext.getTajoWorkerManagerService().getBindAddr().getPort())
+            .setTajoWorkerClientPort(workerContext.getTajoWorkerClientService().getBindAddr().getPort())
+            .setServerStatus(serverStatus)
+            .build();
+
+        workerContext.getTajoMasterRpcClient().heartbeat(null, heartbeatProto, NullCallback.get());
+        try {
+          Thread.sleep(10 * 1000);
+        } catch (InterruptedException e) {
+          break;
+        }
+        sendDiskInfoCount++;
+
+        if(sendDiskInfoCount > 10) {
+          sendDiskInfoCount = 0;
+        }
+      }
+
+      LOG.info("Worker Resource Heartbeat Thread stopped.");
+    }
+  }
+
+  private class ShutdownHook implements Runnable {
+    @Override
+    public void run() {
+      if(!stopped.get()) {
+        LOG.info("============================================");
+        LOG.info("TajoWorker received SIGINT Signal");
+        LOG.info("============================================");
+        stop();
+      }
+    }
+  }
+
+  public void startWorker(TajoConf tajoConf, String[] args) {
+    init(tajoConf);
+    start();
+    setWorkerMode(args);
+  }
+
+  public static List<File> getMountPath() throws Exception {
+    BufferedReader mountOutput = null;
+    try {
+      Process mountProcess = Runtime.getRuntime ().exec("mount");
+      mountOutput = new BufferedReader(new InputStreamReader(mountProcess.getInputStream()));
+      List<File> mountPaths = new ArrayList<File>();
+      while (true) {
+        String line = mountOutput.readLine();
+        if (line == null) {
+          break;
+        }
+
+        System.out.println(line);
+
+        int indexStart = line.indexOf(" on /");
+        int indexEnd = line.indexOf(" ", indexStart + 4);
+
+        mountPaths.add(new File(line.substring (indexStart + 4, indexEnd)));
+      }
+      return mountPaths;
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw e;
+    } finally {
+      if(mountOutput != null) {
+        mountOutput.close();
+      }
+    }
+  }
+
+  public static int getTotalMemoryMB() {
+    com.sun.management.OperatingSystemMXBean bean =
+        (com.sun.management.OperatingSystemMXBean)
+            java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+    long max = bean.getTotalPhysicalMemorySize();
+    return ((int)(max/1024));
+  }
+
+  public static void main(String[] args) throws Exception {
+    StringUtils.startupShutdownMessage(TajoWorker.class, args, LOG);
+
+    if(args.length < 1) {
+      LOG.error("Wrong startup params");
+      System.exit(-1);
+    }
+
+    String workerMode = args[0];
+
+    try {
+      TajoWorker tajoWorker = new TajoWorker(workerMode);
+      tajoWorker.startWorker(new TajoConf(new YarnConfiguration()), args);
+    } catch (Throwable t) {
+      LOG.fatal("Error starting TajoWorker", t);
+      System.exit(-1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
new file mode 100644
index 0000000..3969b36
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.QueryMasterClientProtocol;
+import org.apache.tajo.master.querymaster.Query;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+
+public class TajoWorkerClientService extends AbstractService {
+  private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class);
+  private final PrimitiveProtos.BoolProto BOOL_TRUE =
+          PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
+  private final PrimitiveProtos.BoolProto BOOL_FALSE =
+          PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
+
+  private ProtoBlockingRpcServer rpcServer;
+  private InetSocketAddress bindAddr;
+  private String addr;
+  private int port;
+  private Configuration conf;
+  private TajoWorker.WorkerContext workerContext;
+  private TajoWorkerClientProtocolServiceHandler serviceHandler;
+
+  public TajoWorkerClientService(TajoWorker.WorkerContext workerContext, int port) {
+    super(TajoWorkerClientService.class.getName());
+
+    this.port = port;
+    this.workerContext = workerContext;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.conf = conf;
+    this.serviceHandler = new TajoWorkerClientProtocolServiceHandler();
+
+    // init RPC Server in constructor cause Heartbeat Thread use bindAddr
+    // Setup RPC server
+    try {
+      // TODO initial port num is value of config and find unused port with sequence
+      InetSocketAddress initIsa = new InetSocketAddress("0.0.0.0", port);
+      if (initIsa.getAddress() == null) {
+        throw new IllegalArgumentException("Failed resolve of " + initIsa);
+      }
+
+      // TODO blocking/non-blocking??
+      this.rpcServer = new ProtoBlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa);
+      this.rpcServer.start();
+
+      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+      this.port = bindAddr.getPort();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    // Get the master address
+    LOG.info(TajoWorkerClientService.class.getSimpleName() + " is bind to " + addr);
+    //queryConf.setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("====> TajoWorkerClientService stopping");
+    if(rpcServer != null) {
+      rpcServer.shutdown();
+    }
+    LOG.info("====> TajoWorkerClientService stopped");
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddr() {
+    return bindAddr;
+  }
+
+  public class TajoWorkerClientProtocolServiceHandler
+          implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface {
+    @Override
+    public PrimitiveProtos.BoolProto updateSessionVariables(
+            RpcController controller,
+            ClientProtos.UpdateSessionVariableRequest request) throws ServiceException {
+      return null;
+    }
+
+    @Override
+    public ClientProtos.GetQueryResultResponse getQueryResult(
+            RpcController controller,
+            ClientProtos.GetQueryResultRequest request) throws ServiceException {
+      QueryId queryId = new QueryId(request.getQueryId());
+      Query query = workerContext.getQueryMaster().getQuery(queryId);
+
+      ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder();
+
+      if(query == null) {
+        builder.setErrorMessage("No Query for " + queryId);
+      } else {
+        switch (query.getState()) {
+          case QUERY_SUCCEEDED:
+            builder.setTableDesc((CatalogProtos.TableDescProto)query.getResultDesc().getProto());
+            break;
+          case QUERY_FAILED:
+          case QUERY_ERROR:
+            builder.setErrorMessage("Query " + queryId + " is failed");
+          default:
+            builder.setErrorMessage("Query " + queryId + " is still running");
+        }
+      }
+      return builder.build();
+    }
+
+    @Override
+    public ClientProtos.GetQueryStatusResponse getQueryStatus(
+            RpcController controller,
+            ClientProtos.GetQueryStatusRequest request) throws ServiceException {
+      ClientProtos.GetQueryStatusResponse.Builder builder
+              = ClientProtos.GetQueryStatusResponse.newBuilder();
+      QueryId queryId = new QueryId(request.getQueryId());
+
+      builder.setQueryId(request.getQueryId());
+
+      if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+        builder.setResultCode(ClientProtos.ResultCode.OK);
+        builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+      } else {
+        QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId);
+        builder.setResultCode(ClientProtos.ResultCode.OK);
+        builder.setQueryMasterHost(bindAddr.getHostName());
+        builder.setQueryMasterPort(bindAddr.getPort());
+
+        if (queryMasterTask != null) {
+          queryMasterTask.touchSessionTime();
+          Query query = queryMasterTask.getQuery();
+
+          builder.setState(query.getState());
+          builder.setProgress(query.getProgress());
+          builder.setSubmitTime(query.getAppSubmitTime());
+          builder.setInitTime(query.getInitializationTime());
+          builder.setHasResult(!queryMasterTask.getQueryContext().isCreateTableQuery());
+          if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+            builder.setFinishTime(query.getFinishTime());
+          } else {
+            builder.setFinishTime(System.currentTimeMillis());
+          }
+        } else {
+          builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED);
+        }
+      }
+
+      return builder.build();
+    }
+
+    @Override
+    public PrimitiveProtos.BoolProto killQuery (
+            RpcController controller,
+            TajoIdProtos.QueryIdProto request) throws ServiceException {
+      final QueryId queryId = new QueryId(request);
+      LOG.info("Stop Query:" + queryId);
+      Thread t = new Thread() {
+        public void run() {
+//          try {
+//            Thread.sleep(1000);   //wait tile return to rpc response
+//          } catch (InterruptedException e) {
+//          }
+          workerContext.getQueryMaster().getContext().stopQuery(queryId);
+        }
+      };
+      t.start();
+      return BOOL_TRUE;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
new file mode 100644
index 0000000..a48339a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.master.TaskSchedulerImpl;
+import org.apache.tajo.master.event.*;
+import org.apache.tajo.master.querymaster.QueryMaster;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.rpc.ProtoAsyncRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.util.NetUtils;
+
+import java.net.InetSocketAddress;
+
+public class TajoWorkerManagerService extends CompositeService
+    implements TajoWorkerProtocol.TajoWorkerProtocolService.Interface {
+  private static final Log LOG = LogFactory.getLog(TajoWorkerManagerService.class.getName());
+
+  private ProtoAsyncRpcServer rpcServer;
+  private InetSocketAddress bindAddr;
+  private String addr;
+  private int port;
+
+  private QueryMaster queryMaster;
+
+  private TajoWorker.WorkerContext workerContext;
+
+  public TajoWorkerManagerService(TajoWorker.WorkerContext workerContext, int port) {
+    super(TajoWorkerManagerService.class.getName());
+    this.workerContext = workerContext;
+    this.port = port;
+  }
+
+  public QueryMaster getQueryMaster() {
+    return queryMaster;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    try {
+      // Setup RPC server
+      InetSocketAddress initIsa =
+          new InetSocketAddress("0.0.0.0", port);
+      if (initIsa.getAddress() == null) {
+        throw new IllegalArgumentException("Failed resolve of " + initIsa);
+      }
+
+      this.rpcServer = new ProtoAsyncRpcServer(TajoWorkerProtocol.class, this, initIsa);
+      this.rpcServer.start();
+
+      this.bindAddr = NetUtils.getConnectAddress(rpcServer.getListenAddress());
+      this.addr = bindAddr.getHostName() + ":" + bindAddr.getPort();
+
+      this.port = bindAddr.getPort();
+
+      queryMaster = new QueryMaster(workerContext);
+      addService(queryMaster);
+
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+    // Get the master address
+    LOG.info("TajoWorkerManagerService is bind to " + addr);
+    ((TajoConf)conf).setVar(TajoConf.ConfVars.TASKRUNNER_LISTENER_ADDRESS, addr);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if(rpcServer != null) {
+      rpcServer.shutdown();
+    }
+    LOG.info("TajoWorkerManagerService stopped");
+    super.stop();
+  }
+
+  public InetSocketAddress getBindAddr() {
+    return bindAddr;
+  }
+
+  public String getHostAndPort() {
+    return bindAddr.getHostName() + ":" + bindAddr.getPort();
+  }
+
+  @Override
+  public void getTask(RpcController controller, TajoWorkerProtocol.GetTaskRequestProto request,
+                      RpcCallback<TajoWorkerProtocol.QueryUnitRequestProto> done) {
+    try {
+      ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
+      QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
+      ContainerId cid =
+          queryMasterTask.getQueryContext().getResourceAllocator().makeContainerId(request.getContainerId());
+
+      if(queryMasterTask == null || queryMasterTask.isStopped()) {
+        LOG.info("====>getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
+        done.run(TaskSchedulerImpl.stopTaskRunnerReq);
+      } else {
+        LOG.info("====>getTask:" + cid + ", ebId:" + ebId);
+        queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
+      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatusProto request,
+                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new QueryId(request.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+      queryMasterTask.getEventHandler().handle(
+          new TaskAttemptStatusUpdateEvent(new QueryUnitAttemptId(request.getId()), request));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void ping(RpcController controller,
+                   TajoIdProtos.QueryUnitAttemptIdProto attemptId,
+                   RpcCallback<PrimitiveProtos.BoolProto> done) {
+    // TODO - to be completed
+//      QueryUnitAttemptId attemptId = new QueryUnitAttemptId(attemptIdProto);
+//    context.getQuery(attemptId.getQueryId()).getSubQuery(attemptId.getExecutionBlockId()).
+//        getQueryUnit(attemptId.getQueryUnitId()).getAttempt(attemptId).
+//        resetExpireTime();
+    done.run(TajoWorker.TRUE_PROTO);
+  }
+
+  @Override
+  public void fatalError(RpcController controller, TajoWorkerProtocol.TaskFatalErrorReport report,
+                         RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+      queryMasterTask.getEventHandler().handle(new TaskFatalErrorEvent(report));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void done(RpcController controller, TajoWorkerProtocol.TaskCompletionReport report,
+                   RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(
+          new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId()));
+      queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void executeQuery(RpcController controller,
+                           TajoWorkerProtocol.QueryExecutionRequestProto request,
+                           RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      QueryId queryId = new QueryId(request.getQueryId());
+      LOG.info("====>Receive executeQuery request:" + queryId);
+      queryMaster.handle(new QueryStartEvent(queryId, request.getLogicalPlanJson().getValue()));
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+
+  @Override
+  public void executeExecutionBlock(RpcController controller,
+                                    TajoWorkerProtocol.RunExecutionBlockRequestProto request,
+                                    RpcCallback<PrimitiveProtos.BoolProto> done) {
+    try {
+      String[] params = new String[7];
+      params[0] = "standby";  //mode(never used)
+      params[1] = request.getExecutionBlockId();
+      // NodeId has a form of hostname:port.
+      params[2] = request.getNodeId();
+      params[3] = request.getContainerId();
+
+      // QueryMaster's address
+      params[4] = request.getQueryMasterHost();
+      params[5] = String.valueOf(request.getQueryMasterPort());
+      params[6] = request.getQueryOutputPath();
+      workerContext.getTaskRunnerManager().startTask(params);
+      done.run(TajoWorker.TRUE_PROTO);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      done.run(TajoWorker.FALSE_PROTO);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/d48f2667/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 94f1662..f9cc82c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -37,7 +37,6 @@ import org.apache.tajo.TaskAttemptContext;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStat;
-import org.apache.tajo.ipc.QueryMasterProtocol.*;
 import org.apache.tajo.engine.exception.UnfinishedTaskException;
 import org.apache.tajo.engine.json.CoreGsonHelper;
 import org.apache.tajo.engine.planner.PlannerUtil;
@@ -45,14 +44,15 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.logical.SortNode;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService.Interface;
+import org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
 import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
 import org.apache.tajo.master.ExecutionBlock.PartitionType;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.storage.Fragment;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.worker.TaskRunner.WorkerContext;
+import org.apache.tajo.util.ApplicationIdUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -70,7 +70,7 @@ public class Task {
 
   private final QueryConf conf;
   private final FileSystem localFS;
-  private final WorkerContext workerContext;
+  private final TaskRunner.TaskRunnerContext taskRunnerContext;
   private final Interface masterProxy;
   private final LocalDirAllocator lDirAllocator;
   private final QueryUnitAttemptId taskId;
@@ -128,19 +128,19 @@ public class Task {
       };
 
   public Task(QueryUnitAttemptId taskId,
-              final WorkerContext worker, final Interface masterProxy,
+              final TaskRunner.TaskRunnerContext worker, final Interface masterProxy,
               final QueryUnitRequest request) throws IOException {
     this.request = request;
     this.reporter = new Reporter(masterProxy);
     this.reporter.startCommunicationThread();
 
     this.taskId = request.getId();
-    this.conf = worker.getConf();
-    this.workerContext = worker;
+    this.conf = worker.getQueryConf();
+    this.taskRunnerContext = worker;
     this.masterProxy = masterProxy;
     this.localFS = worker.getLocalFS();
     this.lDirAllocator = worker.getLocalDirAllocator();
-    this.taskDir = StorageUtil.concatPath(workerContext.getBaseDir(),
+    this.taskDir = StorageUtil.concatPath(taskRunnerContext.getBaseDir(),
         taskId.getQueryUnitId().getId() + "_" + taskId.getId());
 
     this.context = new TaskAttemptContext(conf, taskId,
@@ -162,7 +162,7 @@ public class Task {
       // where ss is the subquery id associated with this task, and nnnnnn is the task id.
       Path outFilePath = new Path(conf.getOutputPath(),
           OUTPUT_FILE_PREFIX +
-          OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getSubQueryId().getId()) + "-" +
+          OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
           OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
       LOG.info("Output File Path: " + outFilePath);
       context.setOutputPath(outFilePath);
@@ -184,8 +184,10 @@ public class Task {
       LOG.info("==> Table Id: " + f.getName() + ", url: " + f.getUrls());
     }
     LOG.info("* Local task dir: " + taskDir);
-    LOG.info("* plan:\n");
-    LOG.info(plan.toString());
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("* plan:\n");
+      LOG.debug(plan.toString());
+    }
     LOG.info("==================================");
   }
 
@@ -207,7 +209,6 @@ public class Task {
         }
       }
     }
-
     // for localizing the intermediate data
     localize(request);
   }
@@ -243,7 +244,7 @@ public class Task {
       int i = fetcherRunners.size();
       for (Fragment cache : cached) {
         inFile = new Path(inputTableBaseDir, "in_" + i);
-        workerContext.getDefaultFS().copyToLocalFile(cache.getPath(), inFile);
+        taskRunnerContext.getDefaultFS().copyToLocalFile(cache.getPath(), inFile);
         cache.setPath(inFile);
         i++;
       }
@@ -273,7 +274,7 @@ public class Task {
 
   public void fetch() {
     for (Fetcher f : fetcherRunners) {
-      workerContext.getFetchLauncher().submit(new FetchRunner(context, f));
+      taskRunnerContext.getFetchLauncher().submit(new FetchRunner(context, f));
     }
   }
 
@@ -298,8 +299,8 @@ public class Task {
         // context.getWorkDir() 지우기
         localFS.delete(context.getWorkDir(), true);
         // tasks에서 자기 지우기
-        synchronized (workerContext.getTasks()) {
-          workerContext.getTasks().remove(this.getId());
+        synchronized (taskRunnerContext.getTasks()) {
+          taskRunnerContext.getTasks().remove(this.getId());
         }
       } catch (IOException e) {
         e.printStackTrace();
@@ -312,7 +313,7 @@ public class Task {
 
   public TaskStatusProto getReport() {
     TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
-    builder.setWorkerName(workerContext.getNodeId());
+    builder.setWorkerName(taskRunnerContext.getNodeId());
     builder.setId(context.getTaskId().getProto())
         .setProgress(context.getProgress()).setState(context.getState());
 
@@ -368,7 +369,7 @@ public class Task {
       }
 
       if (context.getFragmentSize() > 0) {
-        this.executor = workerContext.getTQueryEngine().
+        this.executor = taskRunnerContext.getTQueryEngine().
             createPlan(context, plan);
         this.executor.init();
         while(executor.next() != null && !killed) {
@@ -430,7 +431,7 @@ public class Task {
   }
 
   public void cleanupTask() {
-    workerContext.getTasks().remove(getId());
+    taskRunnerContext.getTasks().remove(getId());
   }
 
   public int hashCode() {
@@ -623,7 +624,7 @@ public class Task {
   String fileCache;
   public String getFileCacheDir() {
     fileCache = USERCACHE + "/" + "hyunsik" + "/" + APPCACHE + "/" +
-        ConverterUtils.toString(taskId.getQueryId().getApplicationId()) +
+        ConverterUtils.toString(ApplicationIdUtils.queryIdToAppId(taskId.getQueryUnitId().getExecutionBlockId().getQueryId())) +
         "/" + "output";
     return fileCache;
   }
@@ -631,7 +632,7 @@ public class Task {
   public static Path getTaskAttemptDir(QueryUnitAttemptId quid) {
     Path workDir =
         StorageUtil.concatPath(
-            quid.getSubQueryId().toString(),
+            quid.getQueryUnitId().getExecutionBlockId().toString(),
             String.valueOf(quid.getQueryUnitId().getId()),
             String.valueOf(quid.getId()));
     return workDir;


Mime
View raw message