tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject [05/52] [abbrv] tajo git commit: TAJO-1397: Resource allocation should be fine grained. (jinho)
Date Wed, 22 Jul 2015 13:00:38 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
deleted file mode 100644
index f97ce29..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
+++ /dev/null
@@ -1,844 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import org.apache.commons.lang.exception.ExceptionUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.TajoProtos;
-import org.apache.tajo.TajoProtos.TaskAttemptState;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.engine.query.TaskRequest;
-import org.apache.tajo.ipc.QueryMasterProtocol;
-import org.apache.tajo.ipc.TajoWorkerProtocol.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty.EnforceType;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.plan.function.python.TajoScriptEngine;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.plan.serder.LogicalNodeDeserializer;
-import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.pullserver.TajoPullServerService;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.NetUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.URI;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ExecutorService;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-
-@Deprecated
-public class LegacyTaskImpl implements Task {
-  private static final Log LOG = LogFactory.getLog(LegacyTaskImpl.class);
-  private static final float FETCHER_PROGRESS = 0.5f;
-
-  private final TajoConf systemConf;
-  private final QueryContext queryContext;
-  private final ExecutionBlockContext executionBlockContext;
-  private final String taskRunnerId;
-
-  private final Path taskDir;
-  private final TaskRequest request;
-  private TaskAttemptContext context;
-  private List<Fetcher> fetcherRunners;
-  private LogicalNode plan;
-  private final Map<String, TableDesc> descs = Maps.newHashMap();
-  private PhysicalExec executor;
-  private boolean interQuery;
-  private Path inputTableBaseDir;
-
-  private long startTime;
-  private long finishTime;
-
-  private final TableStats inputStats;
-  private List<FileChunk> localChunks;
-
-  // TODO - to be refactored
-  private ShuffleType shuffleType = null;
-  private Schema finalSchema = null;
-  private TupleComparator sortComp = null;
-
-  public LegacyTaskImpl(String taskRunnerId,
-                        Path baseDir,
-                        TaskAttemptId taskId,
-                        final ExecutionBlockContext executionBlockContext,
-                        final TaskRequest request) throws IOException {
-    this(taskRunnerId, baseDir, taskId, executionBlockContext.getConf(), executionBlockContext, request);
-  }
-
-  public LegacyTaskImpl(String taskRunnerId,
-                        Path baseDir,
-                        TaskAttemptId taskId,
-                        TajoConf conf,
-                        final ExecutionBlockContext executionBlockContext,
-                        final TaskRequest request) throws IOException {
-    this.taskRunnerId = taskRunnerId;
-    this.request = request;
-
-    this.systemConf = conf;
-    this.queryContext = request.getQueryContext(systemConf);
-    this.executionBlockContext = executionBlockContext;
-    this.taskDir = StorageUtil.concatPath(baseDir,
-        taskId.getTaskId().getId() + "_" + taskId.getId());
-
-    this.context = new TaskAttemptContext(queryContext, executionBlockContext, taskId,
-        request.getFragments().toArray(new FragmentProto[request.getFragments().size()]), taskDir);
-    this.context.setDataChannel(request.getDataChannel());
-    this.context.setEnforcer(request.getEnforcer());
-    this.context.setState(TaskAttemptState.TA_PENDING);
-    this.inputStats = new TableStats();
-    this.fetcherRunners = Lists.newArrayList();
-  }
-
-  public void initPlan() throws IOException {
-    plan = LogicalNodeDeserializer.deserialize(queryContext, context.getEvalContext(), request.getPlan());
-    LogicalNode [] scanNode = PlannerUtil.findAllNodes(plan, NodeType.SCAN);
-    if (scanNode != null) {
-      for (LogicalNode node : scanNode) {
-        ScanNode scan = (ScanNode) node;
-        descs.put(scan.getCanonicalName(), scan.getTableDesc());
-      }
-    }
-
-    LogicalNode [] partitionScanNode = PlannerUtil.findAllNodes(plan, NodeType.PARTITIONS_SCAN);
-    if (partitionScanNode != null) {
-      for (LogicalNode node : partitionScanNode) {
-        PartitionedTableScanNode scan = (PartitionedTableScanNode) node;
-        descs.put(scan.getCanonicalName(), scan.getTableDesc());
-      }
-    }
-
-    interQuery = request.getProto().getInterQuery();
-    if (interQuery) {
-      context.setInterQuery();
-      this.shuffleType = context.getDataChannel().getShuffleType();
-
-      if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
-        SortNode sortNode = PlannerUtil.findTopNode(plan, NodeType.SORT);
-        this.finalSchema = PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys());
-        this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
-      }
-    } else {
-      Path outFilePath = ((FileTablespace) TablespaceManager.get(queryContext.getStagingDir().toUri()).get())
-          .getAppenderFilePath(getId(), queryContext.getStagingDir());
-      LOG.info("Output File Path: " + outFilePath);
-      context.setOutputPath(outFilePath);
-    }
-
-    this.localChunks = Collections.synchronizedList(new ArrayList<FileChunk>());
-    LOG.info("==================================");
-    LOG.info("* Stage " + request.getId() + " is initialized");
-    LOG.info("* InterQuery: " + interQuery
-        + (interQuery ? ", Use " + this.shuffleType + " shuffle" : "") +
-        ", Fragments (num: " + request.getFragments().size() + ")" +
-        ", Fetches (total:" + request.getFetches().size() + ") :");
-
-    if(LOG.isDebugEnabled()) {
-      for (FetchImpl f : request.getFetches()) {
-        LOG.debug("Table Id: " + f.getName() + ", Simple URIs: " + f.getSimpleURIs());
-      }
-    }
-    LOG.info("* Local task dir: " + taskDir);
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("* plan:\n");
-      LOG.debug(plan.toString());
-    }
-    LOG.info("==================================");
-  }
-
-  private void startScriptExecutors() throws IOException {
-    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
-      executor.start(systemConf);
-    }
-  }
-
-  private void stopScriptExecutors() {
-    for (TajoScriptEngine executor : context.getEvalContext().getAllScriptEngines()) {
-      executor.shutdown();
-    }
-  }
-
-  @Override
-  public void init() throws IOException {
-    initPlan();
-    startScriptExecutors();
-
-    if (context.getState() == TaskAttemptState.TA_PENDING) {
-      // initialize a task temporal dir
-      FileSystem localFS = executionBlockContext.getLocalFS();
-      localFS.mkdirs(taskDir);
-
-      if (request.getFetches().size() > 0) {
-        inputTableBaseDir = localFS.makeQualified(
-            executionBlockContext.getLocalDirAllocator().getLocalPathForWrite(
-                getTaskAttemptDir(context.getTaskId()).toString(), systemConf));
-        localFS.mkdirs(inputTableBaseDir);
-        Path tableDir;
-        for (String inputTable : context.getInputTables()) {
-          tableDir = new Path(inputTableBaseDir, inputTable);
-          if (!localFS.exists(tableDir)) {
-            LOG.info("the directory is created  " + tableDir.toUri());
-            localFS.mkdirs(tableDir);
-          }
-        }
-      }
-      // for localizing the intermediate data
-      fetcherRunners.addAll(getFetchRunners(context, request.getFetches()));
-    }
-  }
-
-  private TaskAttemptId getId() {
-    return context.getTaskId();
-  }
-
-  public String toString() {
-    return "queryId: " + this.getId() + " status: " + context.getState();
-  }
-
-  @Override
-  public boolean isStopped() {
-    return context.isStopped();
-  }
-
-  @Override
-  public TaskAttemptContext getTaskContext() {
-    return context;
-  }
-
-  @Override
-  public ExecutionBlockContext getExecutionBlockContext() {
-    return executionBlockContext;
-  }
-
-  @Override
-  public boolean hasFetchPhase() {
-    return fetcherRunners.size() > 0;
-  }
-
-  @Override
-  public void fetch() {
-    ExecutorService executorService = executionBlockContext.getTaskRunner(taskRunnerId).getFetchLauncher();
-    for (Fetcher f : fetcherRunners) {
-      executorService.submit(new FetchRunner(context, f));
-    }
-  }
-
-  @Override
-  public void kill() {
-    stopScriptExecutors();
-    context.setState(TaskAttemptState.TA_KILLED);
-    context.stop();
-  }
-
-  @Override
-  public void abort() {
-    stopScriptExecutors();
-    context.setState(TajoProtos.TaskAttemptState.TA_FAILED);
-    context.stop();
-  }
-
-  @Override
-  public TaskStatusProto getReport() {
-    TaskStatusProto.Builder builder = TaskStatusProto.newBuilder();
-    builder.setWorkerName(executionBlockContext.getWorkerContext().getConnectionInfo().getHostAndPeerRpcPort());
-    builder.setId(context.getTaskId().getProto())
-        .setProgress(context.getProgress())
-        .setState(context.getState());
-
-    builder.setInputStats(reloadInputStats());
-
-    if (context.getResultStats() != null) {
-      builder.setResultStats(context.getResultStats().getProto());
-    }
-    return builder.build();
-  }
-
-  @Override
-  public boolean isProgressChanged() {
-    return context.isProgressChanged();
-  }
-
-  @Override
-  public void updateProgress() {
-    if(context != null && context.isStopped()){
-      return;
-    }
-
-    if (executor != null && context.getProgress() < 1.0f) {
-      context.setExecutorProgress(executor.getProgress());
-    }
-  }
-
-  private CatalogProtos.TableStatsProto reloadInputStats() {
-    synchronized(inputStats) {
-      if (this.executor == null) {
-        return inputStats.getProto();
-      }
-
-      TableStats executorInputStats = this.executor.getInputStats();
-
-      if (executorInputStats != null) {
-        inputStats.setValues(executorInputStats);
-      }
-      return inputStats.getProto();
-    }
-  }
-
-  private TaskCompletionReport getTaskCompletionReport() {
-    TaskCompletionReport.Builder builder = TaskCompletionReport.newBuilder();
-    builder.setId(context.getTaskId().getProto());
-
-    builder.setInputStats(reloadInputStats());
-
-    if (context.hasResultStats()) {
-      builder.setResultStats(context.getResultStats().getProto());
-    } else {
-      builder.setResultStats(new TableStats().getProto());
-    }
-
-    Iterator<Entry<Integer, String>> it = context.getShuffleFileOutputs();
-    if (it.hasNext()) {
-      do {
-        Entry<Integer, String> entry = it.next();
-        ShuffleFileOutput.Builder part = ShuffleFileOutput.newBuilder();
-        part.setPartId(entry.getKey());
-
-        // Set output volume
-        if (context.getPartitionOutputVolume() != null) {
-          for (Entry<Integer, Long> e : context.getPartitionOutputVolume().entrySet()) {
-            if (entry.getKey().equals(e.getKey())) {
-              part.setVolume(e.getValue().longValue());
-              break;
-            }
-          }
-        }
-
-        builder.addShuffleFileOutputs(part.build());
-      } while (it.hasNext());
-    }
-
-    return builder.build();
-  }
-
-  private void waitForFetch() throws InterruptedException, IOException {
-    context.getFetchLatch().await();
-    LOG.info(context.getTaskId() + " All fetches are done!");
-    Collection<String> inputs = Lists.newArrayList(context.getInputTables());
-
-    // Get all broadcasted tables
-    Set<String> broadcastTableNames = new HashSet<String>();
-    List<EnforceProperty> broadcasts = context.getEnforcer().getEnforceProperties(EnforceType.BROADCAST);
-    if (broadcasts != null) {
-      for (EnforceProperty eachBroadcast : broadcasts) {
-        broadcastTableNames.add(eachBroadcast.getBroadcast().getTableName());
-      }
-    }
-
-    // localize the fetched data and skip the broadcast table
-    for (String inputTable: inputs) {
-      if (broadcastTableNames.contains(inputTable)) {
-        continue;
-      }
-      File tableDir = new File(context.getFetchIn(), inputTable);
-      FileFragment[] frags = localizeFetchedData(tableDir, inputTable, descs.get(inputTable).getMeta());
-      context.updateAssignedFragments(inputTable, frags);
-    }
-  }
-
-  @Override
-  public void run() throws Exception {
-    startTime = System.currentTimeMillis();
-    Throwable error = null;
-    try {
-      if(!context.isStopped()) {
-        context.setState(TaskAttemptState.TA_RUNNING);
-        if (context.hasFetchPhase()) {
-          // If the fetch is still in progress, the query unit must wait for
-          // complete.
-          waitForFetch();
-          context.setFetcherProgress(FETCHER_PROGRESS);
-          updateProgress();
-        }
-
-        this.executor = executionBlockContext.getTQueryEngine().
-            createPlan(context, plan);
-        this.executor.init();
-
-        while(!context.isStopped() && executor.next() != null) {
-        }
-      }
-    } catch (Throwable e) {
-      error = e ;
-      LOG.error(e.getMessage(), e);
-      stopScriptExecutors();
-      context.stop();
-    } finally {
-      if (executor != null) {
-        try {
-          executor.close();
-          reloadInputStats();
-        } catch (IOException e) {
-          LOG.error(e, e);
-        }
-        this.executor = null;
-      }
-
-      executionBlockContext.completedTasksNum.incrementAndGet();
-      context.getHashShuffleAppenderManager().finalizeTask(getId());
-
-      QueryMasterProtocol.QueryMasterProtocolService.Interface queryMasterStub = executionBlockContext.getStub();
-      if (context.isStopped()) {
-        context.setExecutorProgress(0.0f);
-
-        if (context.getState() == TaskAttemptState.TA_KILLED) {
-          queryMasterStub.statusUpdate(null, getReport(), NullCallback.get());
-          executionBlockContext.killedTasksNum.incrementAndGet();
-        } else {
-          context.setState(TaskAttemptState.TA_FAILED);
-          TaskFatalErrorReport.Builder errorBuilder =
-              TaskFatalErrorReport.newBuilder()
-                  .setId(getId().getProto());
-          if (error != null) {
-            if (error.getMessage() == null) {
-              errorBuilder.setErrorMessage(error.getClass().getCanonicalName());
-            } else {
-              errorBuilder.setErrorMessage(error.getMessage());
-            }
-            errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error));
-          }
-
-          queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get());
-          executionBlockContext.failedTasksNum.incrementAndGet();
-        }
-      } else {
-        // if successful
-        context.stop();
-        context.setProgress(1.0f);
-        context.setState(TaskAttemptState.TA_SUCCEEDED);
-        executionBlockContext.succeededTasksNum.incrementAndGet();
-
-        TaskCompletionReport report = getTaskCompletionReport();
-        queryMasterStub.done(null, report, NullCallback.get());
-      }
-      finishTime = System.currentTimeMillis();
-      LOG.info(context.getTaskId() + " completed. " +
-          "Worker's task counter - total:" + executionBlockContext.completedTasksNum.intValue() +
-          ", succeeded: " + executionBlockContext.succeededTasksNum.intValue()
-          + ", killed: " + executionBlockContext.killedTasksNum.intValue()
-          + ", failed: " + executionBlockContext.failedTasksNum.intValue());
-    }
-  }
-
-  @Override
-  public void cleanup() {
-    TaskHistory taskHistory = createTaskHistory();
-    executionBlockContext.addTaskHistory(taskRunnerId, getId(), taskHistory);
-    executionBlockContext.getTasks().remove(getId());
-
-    fetcherRunners.clear();
-    fetcherRunners = null;
-    try {
-      if(executor != null) {
-        executor.close();
-        executor = null;
-      }
-    } catch (IOException e) {
-      LOG.fatal(e.getMessage(), e);
-    }
-
-    executionBlockContext.getWorkerContext().getTaskHistoryWriter().appendHistory(taskHistory);
-    stopScriptExecutors();
-  }
-
-  public TaskHistory createTaskHistory() {
-    TaskHistory taskHistory = null;
-    try {
-      taskHistory = new TaskHistory(context.getTaskId(), context.getState(), context.getProgress(),
-          startTime, finishTime, reloadInputStats());
-
-      if (context.getOutputPath() != null) {
-        taskHistory.setOutputPath(context.getOutputPath().toString());
-      }
-
-      if (context.getWorkDir() != null) {
-        taskHistory.setWorkingPath(context.getWorkDir().toString());
-      }
-
-      if (context.getResultStats() != null) {
-        taskHistory.setOutputStats(context.getResultStats().getProto());
-      }
-
-      if (hasFetchPhase()) {
-        taskHistory.setTotalFetchCount(fetcherRunners.size());
-        int i = 0;
-        FetcherHistoryProto.Builder builder = FetcherHistoryProto.newBuilder();
-        for (Fetcher fetcher : fetcherRunners) {
-          // TODO store the fetcher histories
-          if (systemConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) {
-            builder.setStartTime(fetcher.getStartTime());
-            builder.setFinishTime(fetcher.getFinishTime());
-            builder.setFileLength(fetcher.getFileLen());
-            builder.setMessageReceivedCount(fetcher.getMessageReceiveCount());
-            builder.setState(fetcher.getState());
-
-            taskHistory.addFetcherHistory(builder.build());
-          }
-          if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED) i++;
-        }
-        taskHistory.setFinishedFetchCount(i);
-      }
-    } catch (Exception e) {
-      LOG.warn(e.getMessage(), e);
-    }
-
-    return taskHistory;
-  }
-
-  public int hashCode() {
-    return context.hashCode();
-  }
-
-  public boolean equals(Object obj) {
-    if (obj instanceof LegacyTaskImpl) {
-      LegacyTaskImpl other = (LegacyTaskImpl) obj;
-      return this.context.equals(other.context);
-    }
-    return false;
-  }
-
-  private FileFragment[] localizeFetchedData(File file, String name, TableMeta meta)
-      throws IOException {
-    Configuration c = new Configuration(systemConf);
-    c.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "file:///");
-    FileSystem fs = FileSystem.get(c);
-    Path tablePath = new Path(file.getAbsolutePath());
-
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus f : fileLists) {
-      if (f.getLen() == 0) {
-        continue;
-      }
-      tablet = new FileFragment(name, f.getPath(), 0l, f.getLen());
-      listTablets.add(tablet);
-    }
-
-    // Special treatment for locally pseudo fetched chunks
-    synchronized (localChunks) {
-      for (FileChunk chunk : localChunks) {
-        if (name.equals(chunk.getEbId())) {
-          tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length());
-          listTablets.add(tablet);
-          LOG.info("One local chunk is added to listTablets");
-        }
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  private class FetchRunner implements Runnable {
-    private final TaskAttemptContext ctx;
-    private final Fetcher fetcher;
-    private int maxRetryNum;
-
-    public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
-      this.ctx = ctx;
-      this.fetcher = fetcher;
-      this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
-    }
-
-    @Override
-    public void run() {
-      int retryNum = 0;
-      int retryWaitTime = 1000; //sec
-
-      try { // for releasing fetch latch
-        while(!context.isStopped() && retryNum < maxRetryNum) {
-          if (retryNum > 0) {
-            try {
-              Thread.sleep(retryWaitTime);
-              retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2);  // max 10 seconds
-            } catch (InterruptedException e) {
-              LOG.error(e);
-            }
-            LOG.warn("Retry on the fetch: " + fetcher.getURI() + " (" + retryNum + ")");
-          }
-          try {
-            FileChunk fetched = fetcher.get();
-            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null
-          && fetched.getFile() != null) {
-              if (fetched.fromRemote() == false) {
-          localChunks.add(fetched);
-          LOG.info("Add a new FileChunk to local chunk list");
-              }
-              break;
-            }
-          } catch (Throwable e) {
-            LOG.error("Fetch failed: " + fetcher.getURI(), e);
-          }
-          retryNum++;
-        }
-      } finally {
-        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
-          fetcherFinished(ctx);
-        } else {
-          if (retryNum == maxRetryNum) {
-            LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")");
-          }
-          stopScriptExecutors();
-          context.stop(); // retry task
-          ctx.getFetchLatch().countDown();
-        }
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public static float adjustFetchProcess(int totalFetcher, int remainFetcher) {
-    if (totalFetcher > 0) {
-      return ((totalFetcher - remainFetcher) / (float) totalFetcher) * FETCHER_PROGRESS;
-    } else {
-      return 0.0f;
-    }
-  }
-
-  private synchronized void fetcherFinished(TaskAttemptContext ctx) {
-    int fetcherSize = fetcherRunners.size();
-    if(fetcherSize == 0) {
-      return;
-    }
-
-    ctx.getFetchLatch().countDown();
-
-    int remainFetcher = (int) ctx.getFetchLatch().getCount();
-    if (remainFetcher == 0) {
-      context.setFetcherProgress(FETCHER_PROGRESS);
-    } else {
-      context.setFetcherProgress(adjustFetchProcess(fetcherSize, remainFetcher));
-    }
-  }
-
-  private List<Fetcher> getFetchRunners(TaskAttemptContext ctx,
-                                        List<FetchImpl> fetches) throws IOException {
-
-    if (fetches.size() > 0) {
-      Path inputDir = executionBlockContext.getLocalDirAllocator().
-          getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf);
-
-      int i = 0;
-      File storeDir;
-      File defaultStoreFile;
-      FileChunk storeChunk = null;
-      List<Fetcher> runnerList = Lists.newArrayList();
-
-      for (FetchImpl f : fetches) {
-        storeDir = new File(inputDir.toString(), f.getName());
-        if (!storeDir.exists()) {
-          storeDir.mkdirs();
-        }
-
-        for (URI uri : f.getURIs()) {
-          defaultStoreFile = new File(storeDir, "in_" + i);
-          InetAddress address = InetAddress.getByName(uri.getHost());
-
-          WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo();
-          if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) {
-            boolean hasError = false;
-            try {
-              LOG.info("Try to get local file chunk at local host");
-              storeChunk = getLocalStoredFileChunk(uri, systemConf);
-            } catch (Throwable t) {
-              hasError = true;
-            }
-
-            // When a range request is out of range, storeChunk will be NULL. This case is normal state.
-            // So, we should skip and don't need to create storeChunk.
-            if (storeChunk == null && !hasError) {
-              continue;
-            }
-
-            if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1
-                && hasError == false) {
-              storeChunk.setFromRemote(false);
-            } else {
-              storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-              storeChunk.setFromRemote(true);
-            }
-          } else {
-            storeChunk = new FileChunk(defaultStoreFile, 0, -1);
-            storeChunk.setFromRemote(true);
-          }
-
-          // If we decide that intermediate data should be really fetched from a remote host, storeChunk
-          // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it
-          storeChunk.setEbId(f.getName());
-          Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk);
-          LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString());
-          runnerList.add(fetcher);
-          i++;
-        }
-      }
-      ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString()));
-      return runnerList;
-    } else {
-      return Lists.newArrayList();
-    }
-  }
-
-  private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException {
-    // Parse the URI
-    LOG.info("getLocalStoredFileChunk starts");
-    final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters();
-    final List<String> types = params.get("type");
-    final List<String> qids = params.get("qid");
-    final List<String> taskIdList = params.get("ta");
-    final List<String> stageIds = params.get("sid");
-    final List<String> partIds = params.get("p");
-    final List<String> offsetList = params.get("offset");
-    final List<String> lengthList = params.get("length");
-
-    if (types == null || stageIds == null || qids == null || partIds == null) {
-      LOG.error("Invalid URI - Required queryId, type, stage Id, and part id");
-      return null;
-    }
-
-    if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) {
-      LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id");
-      return null;
-    }
-
-    String queryId = qids.get(0);
-    String shuffleType = types.get(0);
-    String sid = stageIds.get(0);
-    String partId = partIds.get(0);
-
-    if (shuffleType.equals("r") && taskIdList == null) {
-      LOG.error("Invalid URI - For range shuffle, taskId is required");
-      return null;
-    }
-    List<String> taskIds = splitMaps(taskIdList);
-
-    FileChunk chunk = null;
-    long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L;
-    long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L;
-
-    LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId
-	+ ", taskIds=" + taskIdList);
-
-    // The working directory of Tajo worker for each query, including stage
-    String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/";
-
-    // If the stage requires a range shuffle
-    if (shuffleType.equals("r")) {
-      String ta = taskIds.get(0);
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) {
-        LOG.warn("Range shuffle - file not exist");
-        return null;
-      }
-      Path path = executionBlockContext.getLocalFS().makeQualified(
-	      executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf));
-      String startKey = params.get("start").get(0);
-      String endKey = params.get("end").get(0);
-      boolean last = params.get("final") != null;
-
-      try {
-        chunk = TajoPullServerService.getFileChunks(path, startKey, endKey, last);
-            } catch (Throwable t) {
-        LOG.error("getFileChunks() throws exception");
-        return null;
-      }
-
-      // If the stage requires a hash shuffle or a scattered hash shuffle
-    } else if (shuffleType.equals("h") || shuffleType.equals("s")) {
-      int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf);
-      String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId;
-      if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) {
-        LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath);
-        return null;
-      }
-      Path path = executionBlockContext.getLocalFS().makeQualified(
-        executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf));
-      File file = new File(path.toUri());
-      long startPos = (offset >= 0 && length >= 0) ? offset : 0;
-      long readLen = (offset >= 0 && length >= 0) ? length : file.length();
-
-      if (startPos >= file.length()) {
-        LOG.error("Start pos[" + startPos + "] great than file length [" + file.length() + "]");
-        return null;
-      }
-      chunk = new FileChunk(file, startPos, readLen);
-
-    } else {
-      LOG.error("Unknown shuffle type");
-      return null;
-    }
-
-    return chunk;
-  }
-
-  private List<String> splitMaps(List<String> mapq) {
-    if (null == mapq) {
-      return null;
-    }
-    final List<String> ret = new ArrayList<String>();
-    for (String s : mapq) {
-      Collections.addAll(ret, s.split(","));
-    }
-    return ret;
-  }
-
-  public static Path getTaskAttemptDir(TaskAttemptId quid) {
-    Path workDir =
-        StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()),
-            String.valueOf(quid.getTaskId().getId()),
-            String.valueOf(quid.getId()));
-    return workDir;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
index e763d13..0580ebc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java
@@ -30,35 +30,38 @@ import org.apache.tajo.resource.NodeResources;
 import org.apache.tajo.storage.DiskUtil;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.event.*;
 
-import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.tajo.ResourceProtos.*;
 
 public class NodeResourceManager extends AbstractService implements EventHandler<NodeResourceEvent> {
   private static final Log LOG = LogFactory.getLog(NodeResourceManager.class);
 
   private final Dispatcher dispatcher;
-  private final EventHandler taskEventHandler;
+  private final TajoWorker.WorkerContext workerContext;
+  private final AtomicInteger runningQueryMasters = new AtomicInteger(0);
   private NodeResource totalResource;
   private NodeResource availableResource;
   private TajoConf tajoConf;
+  private boolean enableTest;
 
-  public NodeResourceManager(Dispatcher dispatcher, EventHandler taskEventHandler) {
+  public NodeResourceManager(Dispatcher dispatcher, TajoWorker.WorkerContext workerContext) {
     super(NodeResourceManager.class.getName());
     this.dispatcher = dispatcher;
-    this.taskEventHandler = taskEventHandler;
+    this.workerContext = workerContext;
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
-    }
-    this.tajoConf = (TajoConf)conf;
+    this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
     this.totalResource = createWorkerResource(tajoConf);
     this.availableResource = NodeResources.clone(totalResource);
     this.dispatcher.register(NodeResourceEvent.EventType.class, this);
-
+    validateConf(tajoConf);
+    this.enableTest = conf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE");
     super.serviceInit(conf);
     LOG.info("Initialized NodeResourceManager for " + totalResource);
   }
@@ -66,63 +69,85 @@ public class NodeResourceManager extends AbstractService implements EventHandler
   @Override
   public void handle(NodeResourceEvent event) {
 
-    if (event instanceof NodeResourceAllocateEvent) {
-      NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event;
-      BatchAllocationResponseProto.Builder response = BatchAllocationResponseProto.newBuilder();
-      for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) {
-        NodeResource resource = new NodeResource(request.getResource());
-        if (allocate(resource)) {
-          if(allocateEvent.getRequest().hasExecutionBlockRequest()){
-            //send ExecutionBlock start event to TaskManager
-            startExecutionBlock(allocateEvent.getRequest().getExecutionBlockRequest());
+    switch (event.getType()) {
+      case ALLOCATE: {
+        if (event.getResourceType() == NodeResourceEvent.ResourceType.TASK) {
+          // allocate task resource
+          NodeResourceAllocateEvent allocateEvent = TUtil.checkTypeAndGet(event, NodeResourceAllocateEvent.class);
+          BatchAllocationResponse.Builder response = BatchAllocationResponse.newBuilder();
+          for (TaskAllocationProto request : allocateEvent.getRequest().getTaskRequestList()) {
+            NodeResource resource = new NodeResource(request.getResource());
+            if (allocate(resource)) {
+              //send task start event to TaskExecutor
+              startTask(request.getTaskRequest(), resource);
+            } else {
+              // reject the exceeded requests
+              response.addCancellationTask(request);
+            }
+          }
+          allocateEvent.getCallback().run(response.build());
+
+        } else if (event.getResourceType() == NodeResourceEvent.ResourceType.QUERY_MASTER) {
+          QMResourceAllocateEvent allocateEvent = TUtil.checkTypeAndGet(event, QMResourceAllocateEvent.class);
+          // allocate query master resource
+
+          NodeResource resource = new NodeResource(allocateEvent.getRequest().getResource());
+          if (allocate(resource)) {
+            allocateEvent.getCallback().run(TajoWorker.TRUE_PROTO);
+            runningQueryMasters.incrementAndGet();
+          } else {
+            allocateEvent.getCallback().run(TajoWorker.FALSE_PROTO);
           }
-
-          //send task start event to TaskExecutor
-          startTask(request.getTaskRequest(), resource);
-        } else {
-          // reject the exceeded requests
-          response.addCancellationTask(request);
         }
+        break;
       }
-      allocateEvent.getCallback().run(response.build());
+      case DEALLOCATE: {
+        NodeResourceDeallocateEvent deallocateEvent = TUtil.checkTypeAndGet(event, NodeResourceDeallocateEvent.class);
+        release(deallocateEvent.getResource());
 
-    } else if (event instanceof NodeResourceDeallocateEvent) {
-      NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event;
-      release(deallocateEvent.getResource());
-
-      // send current resource to ResourceTracker
-      getDispatcher().getEventHandler().handle(
-          new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
+        if (deallocateEvent.getResourceType() == NodeResourceEvent.ResourceType.QUERY_MASTER) {
+          runningQueryMasters.decrementAndGet();
+        }
+        // send current resource to ResourceTracker
+        getDispatcher().getEventHandler().handle(
+            new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE));
+        break;
+      }
     }
   }
 
-  protected Dispatcher getDispatcher() {
+  public Dispatcher getDispatcher() {
     return dispatcher;
   }
 
-  protected NodeResource getTotalResource() {
+  public NodeResource getTotalResource() {
     return totalResource;
   }
 
-  protected NodeResource getAvailableResource() {
+  public NodeResource getAvailableResource() {
     return availableResource;
   }
 
+  public int getRunningQueryMasters() {
+    return runningQueryMasters.get();
+  }
+
   private boolean allocate(NodeResource resource) {
-    //TODO consider the jvm free memory
-    if (NodeResources.fitsIn(resource, availableResource)) {
+
+    if (NodeResources.fitsIn(resource, availableResource) && checkFreeHeapMemory(resource)) {
       NodeResources.subtractFrom(availableResource, resource);
       return true;
     }
     return false;
   }
 
-  protected void startExecutionBlock(RunExecutionBlockRequestProto request) {
-    taskEventHandler.handle(new ExecutionBlockStartEvent(request));
+  private boolean checkFreeHeapMemory(NodeResource resource) {
+    //TODO consider the jvm free memory
+    return true;
   }
 
   protected void startTask(TaskRequestProto request, NodeResource resource) {
-    taskEventHandler.handle(new TaskStartEvent(request, resource));
+    workerContext.getTaskManager().getDispatcher().getEventHandler().handle(new TaskStartEvent(request, resource));
   }
 
   private void release(NodeResource resource) {
@@ -130,17 +155,19 @@ public class NodeResourceManager extends AbstractService implements EventHandler
   }
 
   private NodeResource createWorkerResource(TajoConf conf) {
-    int memoryMb;
 
-    if (conf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
-      memoryMb = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
-    } else {
-      memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB),
-          conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB));
+    int memoryMb = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
+    if (!enableTest) {
+      // Set memory resource to max heap
+      int maxHeap = (int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB);
+      if(maxHeap > memoryMb) {
+        memoryMb = maxHeap;
+        conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB, memoryMb);
+      }
     }
 
     int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
-    int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM);
+    int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
 
     int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize();
     if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) {
@@ -150,4 +177,23 @@ public class NodeResourceManager extends AbstractService implements EventHandler
     int diskParallels = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM);
     return NodeResource.createResource(memoryMb, disks * diskParallels, vCores);
   }
+
+  private void validateConf(TajoConf conf) {
+    // validate node memory allocation setting
+    int minMem = conf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY);
+    int minQMMem = conf.getIntVar(TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY);
+    int maxMem = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
+
+    if (minMem <= 0 || minQMMem <= 0 || minMem + minQMMem > maxMem) {
+      throw new RuntimeException("Invalid resource worker memory"
+          + " allocation configuration"
+          + ", " + TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY.varname
+          + "=" + minMem
+          + ", " + TajoConf.ConfVars.QUERYMASTER_MINIMUM_MEMORY.varname
+          + "=" + minQMMem
+          + ", " + TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB.varname
+          + "=" + maxMem + ", min and max should be greater than 0"
+          + ", max should be no smaller than min.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
index d13cd50..5d91cc6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java
@@ -27,11 +27,15 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.resource.NodeResource;
-import org.apache.tajo.rpc.*;
+import org.apache.tajo.resource.DefaultResourceCalculator;
+import org.apache.tajo.resource.NodeResources;
+import org.apache.tajo.rpc.AsyncRpcClient;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.event.NodeStatusEvent;
 
 import java.net.ConnectException;
@@ -42,8 +46,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-
-import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*;
+import static org.apache.tajo.ResourceProtos.*;
 
 /**
  * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
@@ -55,39 +58,42 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
   private TajoConf tajoConf;
   private StatusUpdaterThread updaterThread;
   private volatile boolean isStopped;
-  private volatile long heartBeatInterval;
+  private int heartBeatInterval;
+  private int nextHeartBeatInterval;
   private BlockingQueue<NodeStatusEvent> heartBeatRequestQueue;
   private final TajoWorker.WorkerContext workerContext;
-  private final NodeResourceManager nodeResourceManager;
   private AsyncRpcClient rmClient;
   private ServiceTracker serviceTracker;
-  private TajoResourceTrackerProtocolService.Interface resourceTracker;
-  private int queueingLimit;
+  private TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService.Interface resourceTracker;
+  private int queueingThreshold;
 
-  public NodeStatusUpdater(TajoWorker.WorkerContext workerContext, NodeResourceManager resourceManager) {
+  public NodeStatusUpdater(TajoWorker.WorkerContext workerContext) {
     super(NodeStatusUpdater.class.getSimpleName());
     this.workerContext = workerContext;
-    this.nodeResourceManager = resourceManager;
   }
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
-    }
-    this.tajoConf = (TajoConf) conf;
+
+    this.tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
     this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue();
     this.serviceTracker = ServiceTrackerFactory.get(tajoConf);
-    this.nodeResourceManager.getDispatcher().register(NodeStatusEvent.EventType.class, this);
-    this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL);
+    this.workerContext.getNodeResourceManager().getDispatcher().register(NodeStatusEvent.EventType.class, this);
+    this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_IDLE_INTERVAL);
     this.updaterThread = new StatusUpdaterThread();
+    this.updaterThread.setName("NodeStatusUpdater");
     super.serviceInit(conf);
   }
 
   @Override
   public void serviceStart() throws Exception {
     // if resource changed over than 50%, send reports
-    this.queueingLimit = nodeResourceManager.getTotalResource().getVirtualCores() / 2;
+    DefaultResourceCalculator calculator = new DefaultResourceCalculator();
+    int maxContainer = calculator.computeAvailableContainers(workerContext.getNodeResourceManager().getTotalResource(),
+        NodeResources.createResource(tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY), 1));
+
+    this.queueingThreshold = Math.max((int) Math.floor(maxContainer * 0.5), 1);
+    LOG.info("Queueing threshold:" + queueingThreshold);
 
     updaterThread.start();
     super.serviceStart();
@@ -97,10 +103,8 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
   @Override
   public void serviceStop() throws Exception {
     this.isStopped = true;
-
     synchronized (updaterThread) {
       updaterThread.interrupt();
-      updaterThread.join();
     }
     super.serviceStop();
     LOG.info("NodeStatusUpdater stopped.");
@@ -115,35 +119,30 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
     return heartBeatRequestQueue.size();
   }
 
-  public int getQueueingLimit() {
-    return queueingLimit;
+  public int getQueueingThreshold() {
+    return queueingThreshold;
   }
 
-  private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) {
-    NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
-    requestProto.setAvailableResource(resource.getProto());
+  private NodeHeartbeatRequest.Builder createResourceReport() {
+    NodeHeartbeatRequest.Builder requestProto = NodeHeartbeatRequest.newBuilder();
     requestProto.setWorkerId(workerContext.getConnectionInfo().getId());
-    return requestProto.build();
-  }
+    requestProto.setAvailableResource(workerContext.getNodeResourceManager().getAvailableResource().getProto());
+    requestProto.setRunningTasks(workerContext.getTaskManager().getRunningTasks());
+    requestProto.setRunningQueryMasters(workerContext.getNodeResourceManager().getRunningQueryMasters());
 
-  private NodeHeartbeatRequestProto createHeartBeatReport() {
-    NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
-    requestProto.setWorkerId(workerContext.getConnectionInfo().getId());
-    return requestProto.build();
+    return requestProto;
   }
 
-  private NodeHeartbeatRequestProto createNodeStatusReport() {
-    NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder();
-    requestProto.setTotalResource(nodeResourceManager.getTotalResource().getProto());
-    requestProto.setAvailableResource(nodeResourceManager.getAvailableResource().getProto());
-    requestProto.setWorkerId(workerContext.getConnectionInfo().getId());
+  private NodeHeartbeatRequest.Builder createNodeStatusReport() {
+    NodeHeartbeatRequest.Builder requestProto = createResourceReport();
+    requestProto.setTotalResource(workerContext.getNodeResourceManager().getTotalResource().getProto());
     requestProto.setConnectionInfo(workerContext.getConnectionInfo().getProto());
 
     //TODO set node status to requestProto.setStatus()
-    return requestProto.build();
+    return requestProto;
   }
 
-  protected TajoResourceTrackerProtocolService.Interface newStub()
+  protected TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService.Interface newStub()
       throws NoSuchMethodException, ConnectException, ClassNotFoundException {
     RpcClientManager.cleanup(rmClient);
 
@@ -154,15 +153,15 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
     return rmClient.getStub();
   }
 
-  protected NodeHeartbeatResponseProto sendHeartbeat(NodeHeartbeatRequestProto requestProto)
+  protected NodeHeartbeatResponse sendHeartbeat(NodeHeartbeatRequest requestProto)
       throws NoSuchMethodException, ClassNotFoundException, ConnectException, ExecutionException {
     if (resourceTracker == null) {
       resourceTracker = newStub();
     }
 
-    NodeHeartbeatResponseProto response = null;
+    NodeHeartbeatResponse response = null;
     try {
-      CallFuture<NodeHeartbeatResponseProto> callBack = new CallFuture<NodeHeartbeatResponseProto>();
+      CallFuture<NodeHeartbeatResponse> callBack = new CallFuture<NodeHeartbeatResponse>();
 
       resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack);
       response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -190,7 +189,6 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
       long deadline = System.nanoTime() + unit.toNanos(timeout);
       int added = 0;
       while (added < numElements) {
-        added += heartBeatRequestQueue.drainTo(buffer, numElements - added);
         if (added < numElements) { // not enough elements immediately available; will have to wait
           NodeStatusEvent e = heartBeatRequestQueue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
           if (e == null) {
@@ -200,6 +198,7 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
           added++;
 
           if (e.getType() == NodeStatusEvent.EventType.FLUSH_REPORTS) {
+            added += heartBeatRequestQueue.drainTo(buffer, numElements - added);
             break;
           }
         }
@@ -210,37 +209,39 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
     /* Node sends a heartbeats with its resource and status periodically to master. */
     @Override
     public void run() {
-      NodeHeartbeatResponseProto lastResponse = null;
+      NodeHeartbeatResponse lastResponse = null;
       while (!isStopped && !Thread.interrupted()) {
 
         try {
           if (lastResponse != null) {
             if (lastResponse.getCommand() == ResponseCommand.NORMAL) {
               List<NodeStatusEvent> events = Lists.newArrayList();
+
+              if(lastResponse.hasHeartBeatInterval()) {
+                nextHeartBeatInterval = lastResponse.getHeartBeatInterval();
+              } else {
+                nextHeartBeatInterval = heartBeatInterval;
+              }
+
               try {
                 /* batch update to ResourceTracker */
-                drain(events, Math.max(queueingLimit, 1), heartBeatInterval, TimeUnit.MILLISECONDS);
+                drain(events, queueingThreshold, nextHeartBeatInterval, TimeUnit.MILLISECONDS);
               } catch (InterruptedException e) {
                 break;
               }
 
-              if (!events.isEmpty()) {
-                // send current available resource;
-                lastResponse = sendHeartbeat(createResourceReport(nodeResourceManager.getAvailableResource()));
-              } else {
-                // send ping;
-                lastResponse = sendHeartbeat(createHeartBeatReport());
-              }
+              // send current available resource;
+              lastResponse = sendHeartbeat(createResourceReport().build());
 
             } else if (lastResponse.getCommand() == ResponseCommand.MEMBERSHIP) {
               // Membership changed
-              lastResponse = sendHeartbeat(createNodeStatusReport());
+              lastResponse = sendHeartbeat(createNodeStatusReport().build());
             } else if (lastResponse.getCommand() == ResponseCommand.ABORT_QUERY) {
               //TODO abort failure queries
             }
           } else {
             // Node registration on startup
-            lastResponse = sendHeartbeat(createNodeStatusReport());
+            lastResponse = sendHeartbeat(createNodeStatusReport().build());
           }
         } catch (NoSuchMethodException nsme) {
           LOG.fatal(nsme.getMessage(), nsme);
@@ -249,15 +250,10 @@ public class NodeStatusUpdater extends AbstractService implements EventHandler<N
           LOG.fatal(cnfe.getMessage(), cnfe);
           Runtime.getRuntime().halt(-1);
         } catch (Exception e) {
-          LOG.error(e.getMessage(), e);
-          if (!isStopped) {
-            synchronized (updaterThread) {
-              try {
-                updaterThread.wait(heartBeatInterval);
-              } catch (InterruptedException ie) {
-                // Do Nothing
-              }
-            }
+          if (isStopped) {
+              break;
+          } else {
+            LOG.error(e.getMessage(), e);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
deleted file mode 100644
index b713e70..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.master.container.TajoContainerId;
-
-public interface ResourceAllocator {
-  public void allocateTaskWorker();
-  public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerId);
-  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
-                                           int numTasks, int memoryMBPerTask);
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
deleted file mode 100644
index 05dd1a9..0000000
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ /dev/null
@@ -1,415 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.worker;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ContainerProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.*;
-import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.*;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.container.TajoContainer;
-import org.apache.tajo.master.container.TajoContainerId;
-import org.apache.tajo.master.event.ContainerAllocationEvent;
-import org.apache.tajo.master.event.ContainerAllocatorEventType;
-import org.apache.tajo.master.event.StageContainerAllocationEvent;
-import org.apache.tajo.master.rm.TajoWorkerContainer;
-import org.apache.tajo.master.rm.TajoWorkerContainerId;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
-import org.apache.tajo.querymaster.QueryMasterTask;
-import org.apache.tajo.querymaster.Stage;
-import org.apache.tajo.querymaster.StageState;
-import org.apache.tajo.rpc.CallFuture;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.NullCallback;
-import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.util.ApplicationIdUtils;
-
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class TajoResourceAllocator extends AbstractResourceAllocator {
-  private static final Log LOG = LogFactory.getLog(TajoResourceAllocator.class);
-
-  private TajoConf tajoConf;
-  private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
-  private final ExecutorService allocationExecutor;
-  private final Deallocator deallocator;
-
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
-    this.queryTaskContext = queryTaskContext;
-    allocationExecutor = Executors.newFixedThreadPool(
-      queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
-    deallocator = new Deallocator();
-  }
-
-  @Override
-  public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerIdProto) {
-    TajoWorkerContainerId containerId = new TajoWorkerContainerId();
-    ApplicationAttemptId appAttemptId = new ApplicationAttemptIdPBImpl(containerIdProto.getAppAttemptId());
-    containerId.setApplicationAttemptId(appAttemptId);
-    containerId.setId(containerIdProto.getId());
-    return containerId;
-  }
-
-  @Override
-  public void allocateTaskWorker() {
-  }
-
-  @Override
-  public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
-                                           int numTasks,
-                                           int memoryMBPerTask) {
-    //TODO consider disk slot
-
-    ClusterResourceSummary clusterResource = workerContext.getClusterResource();
-    int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
-    clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
-    LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
-      ", Number of Cluster Slots=" + clusterSlots);
-    return  Math.min(numTasks, clusterSlots);
-  }
-
-  @Override
-  public void init(Configuration conf) {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("conf should be a TajoConf type.");
-    }
-    tajoConf = (TajoConf)conf;
-
-    queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
-
-    queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
-
-    deallocator.start();
-
-    super.init(conf);
-  }
-
-  @Override
-  public synchronized void stop() {
-    if (stopped.compareAndSet(false, true)) {
-      return;
-    }
-
-    allocationExecutor.shutdownNow();
-    deallocator.shutdown();
-
-    Map<TajoContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator()
-      .getContainers();
-    List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
-    for(ContainerProxy eachProxy: list) {
-      try {
-        eachProxy.stopContainer();
-      } catch (Throwable e) {
-        LOG.warn(e.getMessage(), e);
-      }
-    }
-
-    workerInfoMap.clear();
-    super.stop();
-  }
-
-  @Override
-  public void start() {
-    super.start();
-  }
-
-  class TajoTaskRunnerLauncher implements TaskRunnerLauncher {
-    @Override
-    public void handle(TaskRunnerGroupEvent event) {
-      if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_LAUNCH) {
-        if (!(event instanceof LaunchTaskRunnersEvent)) {
-          throw new IllegalArgumentException("event should be a LaunchTaskRunnersEvent type.");
-        }
-        LaunchTaskRunnersEvent launchEvent = (LaunchTaskRunnersEvent) event;
-        launchTaskRunners(launchEvent);
-      } else if (event.getType() == TaskRunnerGroupEvent.EventType.CONTAINER_REMOTE_CLEANUP) {
-        stopContainers(event.getContainers());
-        stopExecutionBlock(event.getExecutionBlockId(), event.getContainers());
-      }
-    }
-  }
-
-  private void launchTaskRunners(LaunchTaskRunnersEvent event) {
-    // Query in standby mode doesn't need launch Worker.
-    // But, Assign ExecutionBlock to assigned tajo worker
-    for(TajoContainer eachContainer: event.getContainers()) {
-      TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
-        eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson());
-      allocationExecutor.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
-    }
-  }
-
-  public void stopExecutionBlock(final ExecutionBlockId executionBlockId,
-                                 Collection<TajoContainer> containers) {
-    Set<NodeId> workers = Sets.newHashSet();
-    for (TajoContainer container : containers){
-      workers.add(container.getNodeId());
-    }
-
-    for (final NodeId worker : workers) {
-      allocationExecutor.submit(new Runnable() {
-        @Override
-        public void run() {
-          stopExecutionBlock(executionBlockId, worker);
-        }
-      });
-    }
-  }
-
-  private void stopExecutionBlock(ExecutionBlockId executionBlockId, NodeId worker) {
-    NettyClientBase tajoWorkerRpc = null;
-    try {
-      InetSocketAddress addr = new InetSocketAddress(worker.getHost(), worker.getPort());
-      tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true);
-      TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub();
-
-      tajoWorkerRpcClient.stopExecutionBlock(null, executionBlockId.getProto(),
-          NullCallback.get(PrimitiveProtos.BoolProto.class));
-    } catch (Throwable e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  protected static class LaunchRunner implements Runnable {
-    private final ContainerProxy proxy;
-    private final TajoContainerId id;
-    public LaunchRunner(TajoContainerId id, ContainerProxy proxy) {
-      this.proxy = proxy;
-      this.id = id;
-    }
-    @Override
-    public void run() {
-      proxy.launch(null);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("ContainerProxy started:" + id);
-      }
-    }
-  }
-
-  private void stopContainers(Collection<TajoContainer> containers) {
-    deallocator.submit(Iterables.transform(containers, new Function<TajoContainer, TajoContainerId>() {
-      public TajoContainerId apply(TajoContainer input) { return input.getId(); }
-    }));
-  }
-
-  private static final TajoContainerId FIN = new TajoWorkerContainerId();
-
-  private class Deallocator extends Thread {
-
-    private final BlockingDeque<TajoContainerId> queue = new LinkedBlockingDeque<TajoContainerId>();
-
-    public Deallocator() {
-      setName("Deallocator");
-      setDaemon(true);
-    }
-
-    private void submit(Iterable<TajoContainerId> container) {
-      queue.addAll(Lists.newArrayList(container));
-    }
-
-    private void shutdown() {
-      queue.add(FIN);
-    }
-
-    @Override
-    public void run() {
-      final AbstractResourceAllocator allocator = queryTaskContext.getResourceAllocator();
-      while (!stopped.get() || !queue.isEmpty()) {
-        TajoContainerId containerId;
-        try {
-          containerId = queue.take();
-        } catch (InterruptedException e) {
-          continue;
-        }
-        if (containerId == FIN) {
-          break;
-        }
-        ContainerProxy proxy = allocator.getContainer(containerId);
-        if (proxy == null) {
-          continue;
-        }
-        try {
-          LOG.info("Stopping ContainerProxy: " + proxy.getContainerId() + "," + proxy.getBlockId());
-          proxy.stopContainer();
-        } catch (Exception e) {
-          LOG.warn("Failed to stop container " + proxy.getContainerId() + "," + proxy.getBlockId(), e);
-        }
-      }
-      LOG.info("Deallocator exiting");
-    }
-  }
-
-  class TajoWorkerAllocationHandler implements EventHandler<ContainerAllocationEvent> {
-    @Override
-    public void handle(ContainerAllocationEvent event) {
-      allocationExecutor.submit(new TajoWorkerAllocationThread(event));
-    }
-  }
-
-  class TajoWorkerAllocationThread extends Thread {
-    ContainerAllocationEvent event;
-    TajoWorkerAllocationThread(ContainerAllocationEvent event) {
-      this.event = event;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Start TajoWorkerAllocationThread");
-      CallFuture<WorkerResourceAllocationResponse> callBack =
-        new CallFuture<WorkerResourceAllocationResponse>();
-
-      //TODO consider task's resource usage pattern
-      int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
-      float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
-
-      WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder()
-          .setMinMemoryMBPerContainer(requiredMemoryMB)
-          .setMaxMemoryMBPerContainer(requiredMemoryMB)
-          .setNumContainers(event.getRequiredNum())
-          .setResourceRequestPriority(!event.isLeafQuery() ?
-              ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK)
-          .setMinDiskSlotPerContainer(requiredDiskSlots)
-          .setMaxDiskSlotPerContainer(requiredDiskSlots)
-          .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
-          .build();
-
-
-      NettyClientBase tmClient = null;
-      try {
-        ServiceTracker serviceTracker = queryTaskContext.getQueryMasterContext().getWorkerContext().getServiceTracker();
-        tmClient = RpcClientManager.getInstance().
-            getClient(serviceTracker.getUmbilicalAddress(), QueryCoordinatorProtocol.class, true);
-        QueryCoordinatorProtocolService masterClientService = tmClient.getStub();
-        masterClientService.allocateWorkerResources(callBack.getController(), request, callBack);
-      } catch (Throwable e) {
-        LOG.error(e.getMessage(), e);
-      }
-
-      WorkerResourceAllocationResponse response = null;
-      while(!stopped.get()) {
-        try {
-          response = callBack.get(3, TimeUnit.SECONDS);
-          break;
-        } catch (InterruptedException e) {
-          if(stopped.get()) {
-            return;
-          }
-        } catch (TimeoutException e) {
-          LOG.info("No available worker resource for " + event.getExecutionBlockId());
-          continue;
-        } catch (ExecutionException e) {
-          LOG.error(e.getMessage(), e);
-          break;
-        }
-      }
-
-      int numAllocatedContainers = 0;
-
-      if(response != null) {
-        List<WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
-        ExecutionBlockId executionBlockId = event.getExecutionBlockId();
-
-        List<TajoContainer> containers = new ArrayList<TajoContainer>();
-        for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
-          TajoWorkerContainer container = new TajoWorkerContainer();
-          NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
-            eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
-
-          TajoWorkerContainerId containerId = new TajoWorkerContainerId();
-
-          containerId.setApplicationAttemptId(
-            ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
-              eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
-          containerId.setId(eachAllocatedResource.getContainerId().getId());
-
-          container.setId(containerId);
-          container.setNodeId(nodeId);
-
-
-          WorkerResource workerResource = new WorkerResource();
-          workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
-          workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
-
-          Worker worker = new Worker(null, workerResource,
-            new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
-          container.setWorkerResource(worker);
-          addWorkerConnectionInfo(worker.getConnectionInfo());
-          containers.add(container);
-        }
-
-        StageState state = queryTaskContext.getStage(executionBlockId).getSynchronizedState();
-        if (!Stage.isRunningState(state)) {
-          List<TajoContainerId> containerIds = new ArrayList<TajoContainerId>();
-          for(TajoContainer eachContainer: containers) {
-            containerIds.add(eachContainer.getId());
-          }
-          try {
-            TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
-          } catch (Throwable e) {
-            deallocator.submit(containerIds);
-            LOG.error(e.getMessage(), e);
-          }
-          return;
-        }
-
-        if (allocatedResources.size() > 0) {
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("StageContainerAllocationEvent fire:" + executionBlockId);
-          }
-          queryTaskContext.getEventHandler().handle(new StageContainerAllocationEvent(executionBlockId, containers));
-        }
-        numAllocatedContainers += allocatedResources.size();
-
-      }
-      if(event.getRequiredNum() > numAllocatedContainers) {
-        ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
-          event.getType(), event.getExecutionBlockId(), event.getPriority(),
-          event.getResource(),
-          event.getRequiredNum() - numAllocatedContainers,
-          event.isLeafQuery(), event.getProgress()
-        );
-        queryTaskContext.getEventHandler().handle(shortRequestEvent);
-
-      }
-      LOG.info("Stop TajoWorkerAllocationThread");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 66c8e4a..a95698f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -42,9 +42,7 @@ import org.apache.tajo.rpc.RpcConstants;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.service.TajoMasterInfo;
-import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
-import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.querymaster.QueryMaster;
 import org.apache.tajo.querymaster.QueryMasterManagerService;
@@ -70,7 +68,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.conf.TajoConf.ConfVars;
 
@@ -97,19 +94,19 @@ public class TajoWorker extends CompositeService {
 
   private WorkerContext workerContext;
 
-  private TaskRunnerManager taskRunnerManager;
+  private TaskManager taskManager;
+
+  private TaskExecutor taskExecutor;
 
   private TajoPullServerService pullService;
 
   private ServiceTracker serviceTracker;
 
-  private WorkerHeartbeatService workerHeartbeatThread;
-
-  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private NodeResourceManager nodeResourceManager;
 
-  private AtomicInteger numClusterNodes = new AtomicInteger();
+  private NodeStatusUpdater nodeStatusUpdater;
 
-  private ClusterResourceSummary clusterResource;
+  private AtomicBoolean stopped = new AtomicBoolean(false);
 
   private WorkerConnectionInfo connectionInfo;
 
@@ -147,12 +144,9 @@ public class TajoWorker extends CompositeService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    if (!(conf instanceof TajoConf)) {
-      throw new IllegalArgumentException("conf should be a TajoConf type.");
-    }
     Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
 
-    this.systemConf = (TajoConf)conf;
+    this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
     RackResolver.init(systemConf);
 
     RpcClientManager rpcManager = RpcClientManager.getInstance();
@@ -165,23 +159,11 @@ public class TajoWorker extends CompositeService {
     this.workerContext = new TajoWorkerContext();
     this.lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
 
-    String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
-
-    boolean randomPort = true;
-    if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
-      randomPort = false;
-    }
 
     int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
     int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
     int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
 
-    if(randomPort) {
-      clientPort = 0;
-      peerRpcPort = 0;
-      qmManagerPort = 0;
-      systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
-    }
 
     this.dispatcher = new AsyncDispatcher();
     addIfService(dispatcher);
@@ -196,12 +178,19 @@ public class TajoWorker extends CompositeService {
     queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
     addIfService(queryMasterManagerService);
 
-    // taskrunner worker
-    taskRunnerManager = new TaskRunnerManager(workerContext, dispatcher);
-    addService(taskRunnerManager);
+    this.taskManager = new TaskManager(dispatcher, workerContext);
+    addService(taskManager);
 
-    workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
-    addIfService(workerHeartbeatThread);
+    this.taskExecutor = new TaskExecutor(workerContext);
+    addService(taskExecutor);
+
+    AsyncDispatcher rmDispatcher = new AsyncDispatcher();
+    addService(rmDispatcher);
+    this.nodeResourceManager = new NodeResourceManager(rmDispatcher, workerContext);
+    addService(nodeResourceManager);
+
+    this.nodeStatusUpdater = new NodeStatusUpdater(workerContext);
+    addService(nodeStatusUpdater);
 
     int httpPort = 0;
     if(!TajoPullServerService.isStandalone()) {
@@ -268,8 +257,8 @@ public class TajoWorker extends CompositeService {
     workerSystemMetrics.register("task", "runningTasks", new Gauge<Integer>() {
       @Override
       public Integer getValue() {
-        if(taskRunnerManager != null) {
-          return taskRunnerManager.getNumTasks();
+        if(taskExecutor != null) {
+          return taskExecutor.getRunningTasks();
         } else {
           return 0;
         }
@@ -394,7 +383,11 @@ public class TajoWorker extends CompositeService {
 
     QueryMasterManagerService getQueryMasterManagerService();
 
-    TaskRunnerManager getTaskRunnerManager();
+    TaskManager getTaskManager();
+
+    TaskExecutor getTaskExecuor();
+
+    NodeResourceManager getNodeResourceManager();
 
     CatalogService getCatalog();
 
@@ -404,8 +397,6 @@ public class TajoWorker extends CompositeService {
 
     LocalDirAllocator getLocalDirAllocator();
 
-    ClusterResourceSummary getClusterResource();
-
     TajoSystemMetrics getWorkerSystemMetrics();
 
     HashShuffleAppenderManager getHashShuffleAppenderManager();
@@ -417,10 +408,6 @@ public class TajoWorker extends CompositeService {
     void cleanup(String strPath);
 
     void cleanupTemporalDirectories();
-
-    void setClusterResource(ClusterResourceSummary clusterResource);
-
-    void setNumClusterNodes(int numClusterNodes);
   }
 
   class TajoWorkerContext implements WorkerContext {
@@ -443,8 +430,19 @@ public class TajoWorker extends CompositeService {
       return queryMasterManagerService;
     }
 
-    public TaskRunnerManager getTaskRunnerManager() {
-      return taskRunnerManager;
+    @Override
+    public TaskManager getTaskManager(){
+      return taskManager;
+    }
+
+    @Override
+    public TaskExecutor getTaskExecuor() {
+      return taskExecutor;
+    }
+
+    @Override
+    public NodeResourceManager getNodeResourceManager() {
+      return nodeResourceManager;
     }
 
     public CatalogService getCatalog() {
@@ -507,22 +505,6 @@ public class TajoWorker extends CompositeService {
       }
     }
 
-    public void setNumClusterNodes(int numClusterNodes) {
-      TajoWorker.this.numClusterNodes.set(numClusterNodes);
-    }
-
-    public void setClusterResource(ClusterResourceSummary clusterResource) {
-      synchronized (numClusterNodes) {
-        TajoWorker.this.clusterResource = clusterResource;
-      }
-    }
-
-    public ClusterResourceSummary getClusterResource() {
-      synchronized (numClusterNodes) {
-        return TajoWorker.this.clusterResource;
-      }
-    }
-
     public TajoSystemMetrics getWorkerSystemMetrics() {
       return workerSystemMetrics;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index de8afe8..7752211 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -18,26 +18,27 @@
 
 package org.apache.tajo.worker;
 
-import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryId;
-import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.ResourceProtos.BatchAllocationRequest;
+import org.apache.tajo.ResourceProtos.BatchAllocationResponse;
+import org.apache.tajo.ResourceProtos.StopExecutionBlockRequest;
 import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.AsyncRpcServer;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.worker.event.TaskRunnerStartEvent;
-import org.apache.tajo.worker.event.TaskRunnerStopEvent;
+import org.apache.tajo.util.TUtil;
+import org.apache.tajo.worker.event.ExecutionBlockStopEvent;
+import org.apache.tajo.worker.event.NodeResourceAllocateEvent;
+import org.apache.tajo.worker.event.QueryStopEvent;
 
 import java.net.InetSocketAddress;
 
@@ -58,9 +59,9 @@ public class TajoWorkerManagerService extends CompositeService
   }
 
   @Override
-  public void init(Configuration conf) {
-    Preconditions.checkArgument(conf instanceof TajoConf);
-    TajoConf tajoConf = (TajoConf) conf;
+  public void serviceInit(Configuration conf) throws Exception {
+
+    TajoConf tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class);
     try {
       // Setup RPC server
       InetSocketAddress initIsa =
@@ -81,21 +82,16 @@ public class TajoWorkerManagerService extends CompositeService
     // Get the master address
     LOG.info("TajoWorkerManagerService is bind to " + bindAddr);
     tajoConf.setVar(TajoConf.ConfVars.WORKER_PEER_RPC_ADDRESS, NetUtils.normalizeInetSocketAddress(bindAddr));
-    super.init(tajoConf);
+    super.serviceInit(tajoConf);
   }
 
   @Override
-  public void start() {
-    super.start();
-  }
-
-  @Override
-  public void stop() {
+  public void serviceStop() throws Exception {
     if(rpcServer != null) {
       rpcServer.shutdown();
     }
     LOG.info("TajoWorkerManagerService stopped");
-    super.stop();
+    super.serviceStop();
   }
 
   public InetSocketAddress getBindAddr() {
@@ -110,29 +106,23 @@ public class TajoWorkerManagerService extends CompositeService
   }
 
   @Override
-  public void startExecutionBlock(RpcController controller,
-                                    TajoWorkerProtocol.RunExecutionBlockRequestProto request,
-                                    RpcCallback<PrimitiveProtos.BoolProto> done) {
-    workerContext.getWorkerSystemMetrics().counter("query", "executedExecutionBlocksNum").inc();
-
-    try {
-      workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStartEvent(request));
-      done.run(TajoWorker.TRUE_PROTO);
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-      controller.setFailed(t.getMessage());
-      done.run(TajoWorker.FALSE_PROTO);
-    }
+  public void allocateTasks(RpcController controller,
+                            BatchAllocationRequest request,
+                            RpcCallback<BatchAllocationResponse> done) {
+    workerContext.getWorkerSystemMetrics().counter("query", "allocationRequestNum").inc();
+    workerContext.getNodeResourceManager().getDispatcher().
+        getEventHandler().handle(new NodeResourceAllocateEvent(request, done));
   }
 
   @Override
   public void stopExecutionBlock(RpcController controller,
-                                 TajoIdProtos.ExecutionBlockIdProto requestProto,
+                                 StopExecutionBlockRequest requestProto,
                                  RpcCallback<PrimitiveProtos.BoolProto> done) {
     try {
-      workerContext.getTaskRunnerManager().getEventHandler().handle(new TaskRunnerStopEvent(
-          new ExecutionBlockId(requestProto)
-      ));
+
+      workerContext.getTaskManager().getDispatcher().getEventHandler().handle(
+          new ExecutionBlockStopEvent(requestProto.getExecutionBlockId(), requestProto.getCleanupList()));
+
       done.run(TajoWorker.TRUE_PROTO);
     } catch (Exception e) {
       LOG.error(e.getMessage(), e);
@@ -144,29 +134,18 @@ public class TajoWorkerManagerService extends CompositeService
   @Override
   public void killTaskAttempt(RpcController controller, TajoIdProtos.TaskAttemptIdProto request,
                               RpcCallback<PrimitiveProtos.BoolProto> done) {
-    Task task = workerContext.getTaskRunnerManager().getTaskByTaskAttemptId(new TaskAttemptId(request));
+    //TODO change to async ?
+    Task task = workerContext.getTaskManager().getTaskByTaskAttemptId(new TaskAttemptId(request));
     if(task != null) task.kill();
 
     done.run(TajoWorker.TRUE_PROTO);
   }
 
   @Override
-  public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request,
+  public void stopQuery(RpcController controller, TajoIdProtos.QueryIdProto request,
                       RpcCallback<PrimitiveProtos.BoolProto> done) {
-    workerContext.cleanup(new QueryId(request).toString());
-    done.run(TajoWorker.TRUE_PROTO);
-  }
 
-  @Override
-  public void cleanupExecutionBlocks(RpcController controller,
-                                     TajoWorkerProtocol.ExecutionBlockListProto ebIds,
-                                     RpcCallback<PrimitiveProtos.BoolProto> done) {
-    for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : ebIds.getExecutionBlockIdList()) {
-      String inputDir = ExecutionBlockContext.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
-      workerContext.cleanup(inputDir);
-      String outputDir = ExecutionBlockContext.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString();
-      workerContext.cleanup(outputDir);
-    }
+    workerContext.getTaskManager().getDispatcher().getEventHandler().handle(new QueryStopEvent(new QueryId(request)));
     done.run(TajoWorker.TRUE_PROTO);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index c849940..66216ee 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -18,9 +18,10 @@
 
 package org.apache.tajo.worker;
 
-import org.apache.tajo.ipc.TajoWorkerProtocol;
+import org.apache.tajo.ResourceProtos.TaskStatusProto;
 
 import java.io.IOException;
+import java.util.List;
 
 public interface Task {
 
@@ -48,5 +49,9 @@ public interface Task {
 
   ExecutionBlockContext getExecutionBlockContext();
 
-  TajoWorkerProtocol.TaskStatusProto getReport();
+  TaskStatusProto getReport();
+
+  TaskHistory createTaskHistory();
+
+  List<Fetcher> getFetchers();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
index 2576726..761bf52 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java
@@ -20,7 +20,6 @@ package org.apache.tajo.worker;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.TajoProtos;
 
 /**
  * The driver class for Tajo Task processing.


Mime
View raw message