tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [2/2] tajo git commit: TAJO-1482: Cleanup the legacy cluster mode. (jinho)
Date Wed, 01 Apr 2015 06:38:51 GMT
TAJO-1482: Cleanup the legacy cluster mode. (jinho)

Closes #484


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/968633ff
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/968633ff
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/968633ff

Branch: refs/heads/master
Commit: 968633ff06b29101586286980a2099fc3dd4c534
Parents: 487a0e5
Author: Jinho Kim <jhkim@apache.org>
Authored: Wed Apr 1 15:37:56 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Wed Apr 1 15:37:56 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../org/apache/tajo/cli/tools/TajoAdmin.java    |  35 +-
 tajo-client/src/main/proto/ClientProtos.proto   |   4 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   4 +-
 .../tajo/master/TajoMasterClientService.java    |   2 -
 .../NonForwardQueryResultSystemScanner.java     |  47 +-
 .../tajo/master/rm/TajoResourceTracker.java     |  10 +-
 .../java/org/apache/tajo/master/rm/Worker.java  |   4 +-
 .../apache/tajo/master/rm/WorkerResource.java   |  20 -
 .../apache/tajo/querymaster/QueryMaster.java    |   8 +-
 .../java/org/apache/tajo/worker/TajoWorker.java |  81 +--
 .../apache/tajo/worker/TaskRunnerManager.java   |  11 +-
 .../tajo/worker/WorkerHeartbeatService.java     |   9 +-
 .../main/proto/QueryCoordinatorProtocol.proto   |   2 -
 tajo-core/src/main/resources/tajo-default.xml   |  12 -
 .../main/resources/webapps/admin/cluster.jsp    |  19 +-
 .../src/main/resources/webapps/admin/index.jsp  |  26 +-
 .../src/main/resources/webapps/worker/index.jsp |   6 -
 .../org/apache/tajo/MiniTajoYarnCluster.java    | 175 -----
 .../org/apache/tajo/TajoTestingCluster.java     |  62 +-
 .../tajo/master/rm/TestTajoResourceManager.java |  14 +-
 tajo-dist/src/main/bin/start-tajo.sh            |  13 +-
 tajo-dist/src/main/bin/stop-tajo.sh             |  11 +-
 tajo-dist/src/main/bin/tajo                     |   9 -
 tajo-dist/src/main/conf/tajo-env.sh             |   3 -
 .../tajo/pullserver/PullServerAuxService.java   | 655 -------------------
 .../apache/tajo/pullserver/TajoPullServer.java  |   6 +-
 .../tajo/pullserver/TajoPullServerService.java  | 115 ++--
 .../retriever/DirectoryRetriever.java           |   2 +-
 29 files changed, 123 insertions(+), 1244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 4e1a27c..12bf84e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -107,6 +107,8 @@ Release 0.11.0 - unreleased
   
   TASKS
 
+    TAJO-1482: Cleanup the legacy cluster mode. (jinho)
+
     TAJO-1439: Some method name is written wrongly. 
     (Contributed by Jongyoung Park. Committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
index 98ccc5f..4f56649 100644
--- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
+++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java
@@ -23,14 +23,16 @@ import org.apache.commons.cli.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TajoProtos;
-import org.apache.tajo.client.*;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.TajoClientUtil;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
 import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
-import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
@@ -230,24 +232,15 @@ public class TajoAdmin {
     List<WorkerResourceInfo> deadQueryMasters = new ArrayList<WorkerResourceInfo>();
 
     for (WorkerResourceInfo eachWorker : workerList) {
-      if(eachWorker.getQueryMasterMode() == true) {
-        if(eachWorker.getWorkerStatus().equals(WorkerStatus.RUNNING.toString())) {
-          liveQueryMasters.add(eachWorker);
-          runningQueryMasterTasks += eachWorker.getNumQueryMasterTasks();
-        }
-        if(eachWorker.getWorkerStatus().equals(WorkerStatus.LOST.toString())) {
-          deadQueryMasters.add(eachWorker);
-        }
-      }
-
-      if(eachWorker.getTaskRunnerMode() == true) {
-        if(eachWorker.getWorkerStatus().equals(WorkerStatus.RUNNING.toString())) {
-          liveWorkers.add(eachWorker);
-        } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.LOST.toString())) {
-          deadWorkers.add(eachWorker);
-        } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.DECOMMISSIONED.toString())) {
-          decommissionWorkers.add(eachWorker);
-        }
+      if(eachWorker.getWorkerStatus().equals(WorkerStatus.RUNNING.toString())) {
+        liveQueryMasters.add(eachWorker);
+        liveWorkers.add(eachWorker);
+        runningQueryMasterTasks += eachWorker.getNumQueryMasterTasks();
+      } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.LOST.toString())) {
+        deadQueryMasters.add(eachWorker);
+        deadWorkers.add(eachWorker);
+      } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.DECOMMISSIONED.toString())) {
+        decommissionWorkers.add(eachWorker);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index 4c359a2..ecb136e 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -189,9 +189,7 @@ message WorkerResourceInfo {
   required int32 numRunningTasks = 11;
   required string workerStatus = 12;
   required int64 lastHeartbeat = 13;
-  required bool queryMasterMode = 14;
-  required bool taskRunnerMode = 15;
-  required int32 numQueryMasterTasks = 16;
+  required int32 numQueryMasterTasks = 14;
 }
 
 message GetClusterInfoResponse {

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index ecdb2ef..53773d8 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -192,11 +192,9 @@ public class TajoConf extends Configuration {
     // for Yarn Resource Manager ----------------------------------------------
 
     /** how many launching TaskRunners in parallel */
-    YARN_RM_QUERY_MASTER_MEMORY_MB("tajo.querymaster.memory-mb", 512, Validators.min("64")),
-    YARN_RM_QUERY_MASTER_DISKS("tajo.yarn-rm.querymaster.disks", 1),
+    @Deprecated
     YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num",
         Runtime.getRuntime().availableProcessors() * 2),
-    YARN_RM_WORKER_NUMBER_PER_NODE("tajo.yarn-rm.max-worker-num-per-node", 8),
 
     // Query Configuration
     QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")),

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 418c30b..4fcdc88 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -661,8 +661,6 @@ public class TajoMasterClientService extends AbstractService {
           workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots());
           workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots());
           workerBuilder.setWorkerStatus(worker.getState().toString());
-          workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode());
-          workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode());
           workerBuilder.setMaxHeap(workerResource.getMaxHeap());
           workerBuilder.setFreeHeap(workerResource.getFreeHeap());
           workerBuilder.setTotalHeap(workerResource.getTotalHeap());

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
index c2ccf34..5901aa7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java
@@ -18,33 +18,14 @@
 
 package org.apache.tajo.master.exec;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
+import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.TaskId;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.conf.TajoConf;
@@ -69,14 +50,15 @@ import org.apache.tajo.plan.logical.IndexScanNode;
 import org.apache.tajo.plan.logical.LogicalNode;
 import org.apache.tajo.plan.logical.ScanNode;
 import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 import org.apache.tajo.worker.TaskAttemptContext;
 
-import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.*;
 
 public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner {
   
@@ -539,22 +521,13 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult
   
   private List<Tuple> getClusterInfo(Schema outSchema) {
     Map<Integer, Worker> workerMap = masterContext.getResourceManager().getWorkers();
-    Set<Integer> keySet = workerMap.keySet();
-    List<Tuple> tuples = Collections.emptyList();
+    List<Tuple> tuples;
     List<Worker> queryMasterList = new ArrayList<Worker>();
     List<Worker> workerList = new ArrayList<Worker>();
     
-    for (Integer keyId: keySet) {
-      Worker aWorker = workerMap.get(keyId);
-      WorkerResource aResource = aWorker.getResource();
-      
-      if (aResource.isQueryMasterMode()) {
-        queryMasterList.add(aWorker);
-      }
-      
-      if (aResource.isTaskRunnerMode()) {
-        workerList.add(aWorker);
-      }
+    for (Worker aWorker: workerMap.values()) {
+      queryMasterList.add(aWorker);
+      workerList.add(aWorker);
     }
     
     tuples = new ArrayList<Tuple>(queryMasterList.size() + workerList.size());

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
index 920fd39..334dbf6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.master.rm;
 
-import com.google.common.base.Preconditions;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import org.apache.commons.logging.Log;
@@ -76,7 +75,9 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
 
   @Override
   public void serviceInit(Configuration conf) {
-    Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance");
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
     TajoConf systemConf = (TajoConf) conf;
 
     String confMasterServiceAddr = systemConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS);
@@ -180,12 +181,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource
   }
 
   private Worker createWorkerResource(NodeHeartbeat request) {
-    boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue();
-    boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue();
-
     WorkerResource workerResource = new WorkerResource();
-    workerResource.setQueryMasterMode(queryMasterMode);
-    workerResource.setTaskRunnerMode(taskRunnerMode);
 
     if(request.getServerStatus() != null) {
       workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB());

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
index d426e80..a2ab598 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java
@@ -196,9 +196,7 @@ public class Worker implements EventHandler<WorkerEvent>, Comparable<Worker> {
     @Override
     public void transition(Worker worker, WorkerEvent workerEvent) {
 
-      if(worker.getResource().isQueryMasterMode()) {
-        worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId());
-      }
+      worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId());
       LOG.info("Worker with " + worker.getResource() + " is joined to Tajo cluster");
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
index bfe186c..5f2d33c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java
@@ -57,10 +57,6 @@ public class WorkerResource {
   private final Lock rlock = lock.readLock();
   private final Lock wlock = lock.writeLock();
 
-  private boolean queryMasterMode;
-
-  private boolean taskRunnerMode;
-
   private AtomicInteger numQueryMasterTasks = new AtomicInteger(0);
 
   public float getDiskSlots() {
@@ -145,22 +141,6 @@ public class WorkerResource {
     return usedDiskSlots;
   }
 
-  public boolean isQueryMasterMode() {
-    return queryMasterMode;
-  }
-
-  public void setQueryMasterMode(boolean queryMasterMode) {
-    this.queryMasterMode = queryMasterMode;
-  }
-
-  public boolean isTaskRunnerMode() {
-    return taskRunnerMode;
-  }
-
-  public void setTaskRunnerMode(boolean taskRunnerMode) {
-    this.taskRunnerMode = taskRunnerMode;
-  }
-
   public void releaseResource(float diskSlots, int memoryMB) {
     try {
       wlock.lock();

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
index bf23133..9cbfb95 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java
@@ -167,10 +167,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
     super.stop();
 
-    LOG.info("QueryMaster stop");
-    if(queryMasterContext.getWorkerContext().isYarnContainerMode()) {
-      queryMasterContext.getWorkerContext().stopWorker(true);
-    }
+    LOG.info("QueryMaster stopped");
   }
 
   protected void cleanupExecutionBlock(List<TajoIdProtos.ExecutionBlockIdProto> executionBlockIds) {
@@ -379,9 +376,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
           }
         }
       }
-      if(workerContext.isYarnContainerMode()) {
-        stop();
-      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 3c55add..1d0293b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -75,12 +75,6 @@ public class TajoWorker extends CompositeService {
   public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build();
   public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build();
 
-  public static final String WORKER_MODE_YARN_TASKRUNNER = "tr";
-  public static final String WORKER_MODE_YARN_QUERYMASTER = "qm";
-  public static final String WORKER_MODE_STANDBY = "standby";
-  public static final String WORKER_MODE_QUERY_MASTER = "standby-qm";
-  public static final String WORKER_MODE_TASKRUNNER = "standby-tr";
-
   private static final Log LOG = LogFactory.getLog(TajoWorker.class);
 
   private TajoConf systemConf;
@@ -103,15 +97,6 @@ public class TajoWorker extends CompositeService {
 
   private TajoPullServerService pullService;
 
-  @Deprecated
-  private boolean yarnContainerMode;
-
-  @Deprecated
-  private boolean queryMasterMode;
-
-  @Deprecated
-  private boolean taskRunnerMode;
-
   private ServiceTracker serviceTracker;
 
   private WorkerHeartbeatService workerHeartbeatThread;
@@ -151,39 +136,11 @@ public class TajoWorker extends CompositeService {
   public void startWorker(TajoConf systemConf, String[] args) {
     this.systemConf = systemConf;
     this.cmdArgs = args;
-    setWorkerMode(args);
     init(systemConf);
     start();
   }
 
-  private void setWorkerMode(String[] args) {
-    if(args.length < 1) {
-      queryMasterMode = systemConf.getBoolean("tajo.worker.mode.querymaster", true);
-      taskRunnerMode = systemConf.getBoolean("tajo.worker.mode.taskrunner", true);
-    } else {
-      if(WORKER_MODE_STANDBY.equals(args[0])) {
-        queryMasterMode = true;
-        taskRunnerMode = true;
-      } else if(WORKER_MODE_YARN_TASKRUNNER.equals(args[0])) {
-        yarnContainerMode = true;
-        queryMasterMode = true;
-      } else if(WORKER_MODE_YARN_QUERYMASTER.equals(args[0])) {
-        yarnContainerMode = true;
-        taskRunnerMode = true;
-      } else if(WORKER_MODE_QUERY_MASTER.equals(args[0])) {
-        yarnContainerMode = false;
-        queryMasterMode = true;
-      } else {
-        yarnContainerMode = false;
-        taskRunnerMode = true;
-      }
-    }
-    if(!queryMasterMode && !taskRunnerMode) {
-      LOG.fatal("Worker daemon exit cause no worker mode(querymaster/taskrunner) property");
-      System.exit(0);
-    }
-  }
-  
+
   @Override
   public void serviceInit(Configuration conf) throws Exception {
     if (!(conf instanceof TajoConf)) {
@@ -205,6 +162,7 @@ public class TajoWorker extends CompositeService {
     if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
       randomPort = false;
     }
+
     int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
     int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
     int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
@@ -237,7 +195,7 @@ public class TajoWorker extends CompositeService {
     addIfService(workerHeartbeatThread);
 
     int httpPort = 0;
-    if(taskRunnerMode && !TajoPullServerService.isStandalone()) {
+    if(!TajoPullServerService.isStandalone()) {
       pullService = new TajoPullServerService();
       addIfService(pullService);
     }
@@ -263,8 +221,7 @@ public class TajoWorker extends CompositeService {
         queryMasterManagerService.getBindAddr().getPort(),
         httpPort);
 
-    LOG.info("Tajo Worker is initialized. \r\nQueryMaster=" + queryMasterMode + " TaskRunner=" + taskRunnerMode
-        + " connection :" + connectionInfo.toString());
+    LOG.info("Tajo Worker is initialized." + " connection :" + connectionInfo.toString());
 
     try {
       hashShuffleAppenderManager = new HashShuffleAppenderManager(systemConf);
@@ -312,10 +269,6 @@ public class TajoWorker extends CompositeService {
   private int initWebServer() {
     int httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
     try {
-      if (queryMasterMode && !taskRunnerMode) {
-        //If QueryMaster and TaskRunner run on single host, http port conflicts
-        httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
-      }
       webServer = StaticHttpServer.getInstance(this, "worker", null, httpPort,
           true, null, systemConf, null);
       webServer.start();
@@ -457,18 +410,7 @@ public class TajoWorker extends CompositeService {
     }
 
     public String getWorkerName() {
-      if (queryMasterMode) {
-        return getQueryMasterManagerService().getHostAndPort();
-      } else {
-        return connectionInfo.getHostAndPeerRpcPort();
-      }
-    }
-
-    public void stopWorker(boolean force) {
-      stop();
-      if (force) {
-        System.exit(0);
-      }
+      return connectionInfo.getHostAndPeerRpcPort();
     }
 
     public LocalDirAllocator getLocalDirAllocator(){
@@ -515,11 +457,6 @@ public class TajoWorker extends CompositeService {
       }
     }
 
-    @Deprecated
-    public boolean isYarnContainerMode() {
-      return yarnContainerMode;
-    }
-
     public void setNumClusterNodes(int numClusterNodes) {
       TajoWorker.this.numClusterNodes.set(numClusterNodes);
     }
@@ -536,14 +473,6 @@ public class TajoWorker extends CompositeService {
       }
     }
 
-    public boolean isQueryMasterMode() {
-      return queryMasterMode;
-    }
-
-    public boolean isTaskRunnerMode() {
-      return taskRunnerMode;
-    }
-
     public TajoSystemMetrics getWorkerSystemMetrics() {
       return workerSystemMetrics;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index a375a31..11a95ce 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.worker;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -63,7 +62,9 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
 
   @Override
   public void init(Configuration conf) {
-    Preconditions.checkArgument(conf instanceof TajoConf);
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
     tajoConf = (TajoConf)conf;
     dispatcher.register(TaskRunnerEvent.EventType.class, this);
     super.init(tajoConf);
@@ -98,18 +99,12 @@ public class TaskRunnerManager extends CompositeService implements EventHandler<
     }
 
     super.stop();
-    if(workerContext.isYarnContainerMode()) {
-      workerContext.stopWorker(true);
-    }
   }
 
   public void stopTaskRunner(String id) {
     LOG.info("Stop Task:" + id);
     TaskRunner taskRunner = taskRunnerMap.remove(id);
     taskRunner.stop();
-    if(workerContext.isYarnContainerMode()) {
-      stop();
-    }
   }
 
   public Collection<TaskRunner> getTaskRunners() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 5493b37..e9f90ca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.worker;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
@@ -26,7 +25,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ha.HAServiceUtil;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto;
 import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse;
@@ -34,7 +32,6 @@ import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.storage.DiskDeviceInfo;
 import org.apache.tajo.storage.DiskMountInfo;
@@ -72,7 +69,9 @@ public class WorkerHeartbeatService extends AbstractService {
 
   @Override
   public void serviceInit(Configuration conf) throws Exception {
-    Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
     this.systemConf = (TajoConf) conf;
 
     connectionPool = RpcConnectionPool.getPool();
@@ -170,8 +169,6 @@ public class WorkerHeartbeatService extends AbstractService {
             .setDiskSlots(workerDiskSlots)
             .setMemoryResourceMB(workerMemoryMB)
             .setJvmHeap(jvmHeap)
-            .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isQueryMasterMode()))
-            .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isTaskRunnerMode()))
             .build();
 
         NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
index cef385e..2440e2a 100644
--- a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
+++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto
@@ -58,8 +58,6 @@ message ServerStatusProto {
     repeated Disk disk = 4;
     required int32 runningTaskNum = 5;
     required JvmHeap jvmHeap = 6;
-    required BoolProto queryMasterMode = 7;
-    required BoolProto taskRunnerMode = 8;
 }
 
 message TajoHeartbeat {

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/tajo-default.xml b/tajo-core/src/main/resources/tajo-default.xml
index 4a92e72..d9c21bb 100644
--- a/tajo-core/src/main/resources/tajo-default.xml
+++ b/tajo-core/src/main/resources/tajo-default.xml
@@ -26,18 +26,6 @@
   </property>
 
   <property>
-    <name>tajo.worker.mode.querymaster</name>
-    <value>true</value>
-    <description></description>
-  </property>
-
-  <property>
-    <name>tajo.worker.mode.taskrunner</name>
-    <value>true</value>
-    <description></description>
-  </property>
-
-  <property>
     <name>tajo.querymaster.task-scheduler</name>
     <value>org.apache.tajo.querymaster.DefaultTaskScheduler</value>
   </property>

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/resources/webapps/admin/cluster.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
index aca1153..816a144 100644
--- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp
@@ -48,26 +48,17 @@
   Set<Worker> deadQueryMasters = new TreeSet<Worker>();
 
   for(Worker eachWorker: workers.values()) {
-    if(eachWorker.getResource().isQueryMasterMode()) {
-      liveQueryMasters.add(eachWorker);
-      runningQueryMasterTasks += eachWorker.getResource().getNumQueryMasterTasks();
-    }
-
-    if(eachWorker.getResource().isTaskRunnerMode()) {
-      liveWorkers.add(eachWorker);
-    }
+    liveQueryMasters.add(eachWorker);
+    liveWorkers.add(eachWorker);
+    runningQueryMasterTasks += eachWorker.getResource().getNumQueryMasterTasks();
   }
 
   for (Worker inactiveWorker : master.getContext().getResourceManager().getInactiveWorkers().values()) {
     WorkerState state = inactiveWorker.getState();
 
     if (state == WorkerState.LOST) {
-      if (inactiveWorker.getResource().isQueryMasterMode()) {
-        deadQueryMasters.add(inactiveWorker);
-      }
-      if (inactiveWorker.getResource().isTaskRunnerMode()) {
-        deadWorkers.add(inactiveWorker);
-      }
+      deadQueryMasters.add(inactiveWorker);
+      deadWorkers.add(inactiveWorker);
     } else if (state == WorkerState.DECOMMISSIONED) {
       decommissionWorkers.add(inactiveWorker);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index 0a0558e..468fc72 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -57,27 +57,19 @@
           master.getContext().getResourceManager().getClusterResourceSummary();
 
   for(Worker eachWorker: workers.values()) {
-    if(eachWorker.getResource().isQueryMasterMode()) {
-      numQueryMasters++;
-      numLiveQueryMasters++;
-      runningQueryMasterTask += eachWorker.getResource().getNumQueryMasterTasks();
-    }
-    if(eachWorker.getResource().isTaskRunnerMode()) {
-      numWorkers++;
-      numLiveWorkers++;
-    }
+    numQueryMasters++;
+    numLiveQueryMasters++;
+    runningQueryMasterTask += eachWorker.getResource().getNumQueryMasterTasks();
+    numWorkers++;
+    numLiveWorkers++;
   }
 
   for (Worker eachWorker : inactiveWorkers.values()) {
     if (eachWorker.getState() == WorkerState.LOST) {
-      if(eachWorker.getResource().isQueryMasterMode()) {
-        numQueryMasters++;
-        numDeadQueryMasters++;
-      }
-      if(eachWorker.getResource().isTaskRunnerMode()) {
-        numWorkers++;
-        numDeadWorkers++;
-      }
+      numQueryMasters++;
+      numDeadQueryMasters++;
+      numWorkers++;
+      numDeadWorkers++;
     } else if(eachWorker.getState() == WorkerState.DECOMMISSIONED) {
       numDecommissionWorkers++;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/resources/webapps/worker/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/worker/index.jsp b/tajo-core/src/main/resources/webapps/worker/index.jsp
index bb72f9e..bc3cb1e 100644
--- a/tajo-core/src/main/resources/webapps/worker/index.jsp
+++ b/tajo-core/src/main/resources/webapps/worker/index.jsp
@@ -59,7 +59,6 @@
   <hr/>
 
 <%
-if(tajoWorker.getWorkerContext().isQueryMasterMode()) {
   List<QueryMasterTask> queryMasterTasks = JSPUtil.sortQueryMasterTask(tajoWorker.getWorkerContext()
           .getQueryMasterManagerService().getQueryMaster().getQueryMasterTasks(), true);
 
@@ -122,8 +121,6 @@ if(tajoWorker.getWorkerContext().isQueryMasterMode()) {
   <p/>
   <hr/>
 <%
-} // end of QueryMaster
-if(tajoWorker.getWorkerContext().isTaskRunnerMode()) {
   List<TaskRunner> taskRunners = new ArrayList<TaskRunner>(tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunners());
   JSPUtil.sortTaskRunner(taskRunners);
 %>
@@ -145,9 +142,6 @@ if(tajoWorker.getWorkerContext().isTaskRunnerMode()) {
       }   //end of for
 %>
   </table>
-<%
-} //end of if
-%>
 </div>
 </body>
 </html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
deleted file mode 100644
index 88d913d..0000000
--- a/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.LocalContainerLauncher;
-import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
-import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.util.JarFinder;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
-import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.pullserver.PullServerAuxService;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-
-/**
- * Configures and starts the Tajo-specific components in the YARN cluster.
- *
- */
-public class MiniTajoYarnCluster extends MiniYARNCluster {
-
-  public static final String APPJAR = JarFinder
-      .getJar(LocalContainerLauncher.class);
-
-  private static final Log LOG = LogFactory.getLog(MiniTajoYarnCluster.class);
-
-  public MiniTajoYarnCluster(String testName) {
-    this(testName, 1);
-  }
-
-  public MiniTajoYarnCluster(String testName, int noOfNMs) {
-    super(testName, noOfNMs, 1, 1);
-  }
-
-  @Override
-  public void init(Configuration conf) {
-
-    conf.setSocketAddr(YarnConfiguration.RM_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
-    conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
-
-    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
-    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
-      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
-          "apps_staging_dir/").getAbsolutePath());
-    }
-    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
-
-    try {
-      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
-          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
-      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
-      if (fc.util().exists(stagingPath)) {
-        LOG.info(stagingPath + " exists! deleting...");
-        fc.delete(stagingPath, true);
-      }
-      LOG.info("mkdir: " + stagingPath);
-      //mkdir the staging directory so that right permissions are set while running as proxy user
-      fc.mkdir(stagingPath, null, true);
-      //mkdir done directory as well
-      String doneDir = JobHistoryUtils
-          .getConfiguredHistoryServerDoneDirPrefix(conf);
-      Path doneDirPath = fc.makeQualified(new Path(doneDir));
-      fc.mkdir(doneDirPath, null, true);
-    } catch (IOException e) {
-      throw new YarnRuntimeException("Could not create staging directory. ", e);
-    }
-    conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
-    // which shuffle doesn't happen
-    //configure the shuffle service in NM
-    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, PullServerAuxService.PULLSERVER_SERVICEID);
-    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
-        PullServerAuxService.PULLSERVER_SERVICEID), PullServerAuxService.class,
-        Service.class);
-
-    // Non-standard shuffle port
-    conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.name(), 0);
-
-    // local directory
-    conf.set(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.name(), "/tmp/tajo-localdir");
-
-    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
-        DefaultContainerExecutor.class, ContainerExecutor.class);
-
-    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
-    // for corresponding uberized tests.
-    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
-
-    conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 600);
-
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    super.start();
-
-    LOG.info("MiniTajoYarn NM Local Dir: " + getConfig().get(YarnConfiguration.NM_LOCAL_DIRS));
-  }
-
-  private class JobHistoryServerWrapper extends AbstractService {
-    public JobHistoryServerWrapper() {
-      super(JobHistoryServerWrapper.class.getName());
-    }
-
-    @Override
-    public synchronized void start() {
-      try {
-        if (!getConfig().getBoolean(
-            JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
-            JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
-          // pick free random ports.
-          getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
-              MiniYARNCluster.getHostname() + ":0");
-          getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
-              MiniYARNCluster.getHostname() + ":0");
-        }
-        super.start();
-      } catch (Throwable t) {
-        throw new YarnRuntimeException(t);
-      }
-
-      LOG.info("MiniMRYARN ResourceManager address: " +
-          getConfig().get(YarnConfiguration.RM_ADDRESS));
-      LOG.info("MiniMRYARN ResourceManager web address: " +
-          getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
-      LOG.info("MiniMRYARN HistoryServer address: " +
-          getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
-      LOG.info("MiniMRYARN HistoryServer web address: " +
-          getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
-    }
-
-    @Override
-    public synchronized void stop() {
-      super.stop();
-    }
-  }
-
-  public static void main(String [] args) {
-    MiniTajoYarnCluster cluster = new MiniTajoYarnCluster(MiniTajoYarnCluster.class.getName());
-    cluster.init(new TajoConf());
-    cluster.start();
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 17348e1..9d9310e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -26,13 +26,10 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.tajo.catalog.*;
@@ -56,9 +53,10 @@ import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.Writer;
 import java.net.InetSocketAddress;
-import java.net.URL;
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.List;
@@ -68,8 +66,6 @@ import java.util.UUID;
 public class TajoTestingCluster {
 	private static Log LOG = LogFactory.getLog(TajoTestingCluster.class);
 	private TajoConf conf;
-
-  protected MiniTajoYarnCluster yarnCluster;
   private FileSystem defaultFS;
   private MiniDFSCluster dfsCluster;
 	private MiniCatalogServer catalogServer;
@@ -77,7 +73,6 @@ public class TajoTestingCluster {
 
   private TajoMaster tajoMaster;
   private List<TajoWorker> tajoWorkers = new ArrayList<TajoWorker>();
-  private boolean standbyWorkerMode = false;
   private boolean isDFSRunning = false;
   private boolean isTajoClusterRunning = false;
   private boolean isCatalogServerRunning = false;
@@ -165,9 +160,6 @@ public class TajoTestingCluster {
     // Memory cache termination
     conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1);
 
-    this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS)
-        .indexOf(TajoWorkerResourceManager.class.getName()) >= 0;
-
     /* Since Travi CI limits the size of standard output log up to 4MB */
     if (!StringUtils.isEmpty(LOG_LEVEL)) {
       Level defaultLevel = Logger.getRootLogger().getLevel();
@@ -370,9 +362,8 @@ public class TajoTestingCluster {
     this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
     this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS));
 
-    if(standbyWorkerMode) {
-      startTajoWorkers(numSlaves);
-    }
+    startTajoWorkers(numSlaves);
+
     isTajoClusterRunning = true;
     LOG.info("Mini Tajo cluster is up");
     LOG.info("====================================================================================");
@@ -434,7 +425,7 @@ public class TajoTestingCluster {
 
       workerConf.setVar(ConfVars.WORKER_QM_RPC_ADDRESS, "localhost:0");
       
-      tajoWorker.startWorker(workerConf, new String[]{"standby"});
+      tajoWorker.startWorker(workerConf, new String[0]);
 
       LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started.");
       tajoWorkers.add(tajoWorker);
@@ -512,46 +503,9 @@ public class TajoTestingCluster {
 
     hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir);
 
-    if(!standbyWorkerMode) {
-      startMiniYarnCluster();
-    }
-
     startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false);
   }
 
-  private void startMiniYarnCluster() throws Exception {
-    LOG.info("Starting up YARN cluster");
-    // Scheduler properties required for YARN to work
-    conf.set("yarn.scheduler.capacity.root.queues", "default");
-    conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
-
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 2);
-
-    if (yarnCluster == null) {
-      yarnCluster = new MiniTajoYarnCluster(TajoTestingCluster.class.getName(), 3);
-      yarnCluster.init(conf);
-      yarnCluster.start();
-
-      ResourceManager resourceManager = yarnCluster.getResourceManager();
-      InetSocketAddress rmAddr = resourceManager.getClientRMService().getBindAddress();
-      InetSocketAddress rmSchedulerAddr = resourceManager.getApplicationMasterService().getBindAddress();
-      conf.set(YarnConfiguration.RM_ADDRESS, NetUtils.normalizeInetSocketAddress(rmAddr));
-      conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, NetUtils.normalizeInetSocketAddress(rmSchedulerAddr));
-
-      URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
-      if (url == null) {
-        throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
-      }
-      yarnCluster.getConfig().set("yarn.application.classpath", new File(url.getPath()).getParent());
-      OutputStream os = new FileOutputStream(new File(url.getPath()));
-      yarnCluster.getConfig().writeXml(os);
-      os.close();
-    }
-  }
-
   public void startMiniClusterInLocal(final int numSlaves) throws Exception {
     isRunningCluster();
 
@@ -580,10 +534,6 @@ public class TajoTestingCluster {
       isCatalogServerRunning = false;
     }
 
-    if(this.yarnCluster != null) {
-      this.yarnCluster.stop();
-    }
-
     try {
       Thread.sleep(3000);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index a013d0b..2c997a3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -49,7 +49,7 @@ public class TestTajoResourceManager {
   int workerMemoryMB = 512 * 10;
   WorkerResourceAllocationResponse response;
 
-  private TajoWorkerResourceManager initResourceManager(boolean queryMasterMode) throws Exception {
+  private TajoWorkerResourceManager initResourceManager() throws Exception {
     tajoConf = new org.apache.tajo.conf.TajoConf();
 
     tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f);
@@ -85,8 +85,6 @@ public class TestTajoResourceManager {
       disks.add(disk);
 
       ServerStatusProto serverStatus = ServerStatusProto.newBuilder()
-          .setQueryMasterMode(queryMasterMode ? BOOL_TRUE : BOOL_FALSE)
-          .setTaskRunnerMode(BOOL_TRUE)
           .setDiskSlots(workerDiskSlots)
           .setMemoryResourceMB(workerMemoryMB)
           .setJvmHeap(jvmHeap)
@@ -113,7 +111,7 @@ public class TestTajoResourceManager {
   public void testHeartbeat() throws Exception {
     TajoWorkerResourceManager tajoWorkerResourceManager = null;
     try {
-      tajoWorkerResourceManager = initResourceManager(false);
+      tajoWorkerResourceManager = initResourceManager();
       assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size());
       for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) {
         WorkerResource resource = worker.getResource();
@@ -131,7 +129,7 @@ public class TestTajoResourceManager {
   public void testMemoryResource() throws Exception {
     TajoWorkerResourceManager tajoWorkerResourceManager = null;
     try {
-      tajoWorkerResourceManager = initResourceManager(false);
+      tajoWorkerResourceManager = initResourceManager();
 
       final int minMemory = 256;
       final int maxMemory = 512;
@@ -215,7 +213,7 @@ public class TestTajoResourceManager {
     TajoWorkerResourceManager tajoWorkerResourceManager = null;
 
     try {
-      tajoWorkerResourceManager = initResourceManager(false);
+      tajoWorkerResourceManager = initResourceManager();
 
       final int minMemory = 200;
       final int maxMemory = 500;
@@ -300,7 +298,7 @@ public class TestTajoResourceManager {
     TajoWorkerResourceManager tajoWorkerResourceManager = null;
 
     try {
-      tajoWorkerResourceManager = initResourceManager(false);
+      tajoWorkerResourceManager = initResourceManager();
 
       final float minDiskSlots = 1.0f;
       final float maxDiskSlots = 2.0f;
@@ -382,7 +380,7 @@ public class TestTajoResourceManager {
     TajoWorkerResourceManager tajoWorkerResourceManager = null;
 
     try {
-      tajoWorkerResourceManager = initResourceManager(false);
+      tajoWorkerResourceManager = initResourceManager();
 
       final float minDiskSlots = 1.0f;
       final float maxDiskSlots = 2.0f;

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-dist/src/main/bin/start-tajo.sh
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/start-tajo.sh b/tajo-dist/src/main/bin/start-tajo.sh
index 8fcf1a6..efc0371 100755
--- a/tajo-dist/src/main/bin/start-tajo.sh
+++ b/tajo-dist/src/main/bin/start-tajo.sh
@@ -44,12 +44,9 @@ if [ -f "${TAJO_CONF_DIR}/tajo-env.sh" ]; then
   . "${TAJO_CONF_DIR}/tajo-env.sh"
 fi
 
-if [ "$TAJO_WORKER_STANDBY_MODE" = "true" ]; then
-  if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then
-    "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start pullserver
-  fi
-  "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start worker
-  if [ -f "${TAJO_CONF_DIR}/querymasters" ]; then
-    "$bin/tajo-daemons.sh" --hosts querymasters cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start querymaster
-  fi
+
+if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then
+  "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start pullserver
 fi
+
+"$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start worker

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-dist/src/main/bin/stop-tajo.sh
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/stop-tajo.sh b/tajo-dist/src/main/bin/stop-tajo.sh
index 770034b..32a4a4c 100755
--- a/tajo-dist/src/main/bin/stop-tajo.sh
+++ b/tajo-dist/src/main/bin/stop-tajo.sh
@@ -44,13 +44,8 @@ if [ -f "${TAJO_CONF_DIR}/tajo-env.sh" ]; then
   . "${TAJO_CONF_DIR}/tajo-env.sh"
 fi
 
-if [ "$TAJO_WORKER_STANDBY_MODE" = "true" ]; then
-  "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop worker
-  if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then
-    "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop pullserver
-  fi
-  if [ -f "${TAJO_CONF_DIR}/querymasters" ]; then
-    "$bin/tajo-daemons.sh" --hosts querymasters cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop querymaster
-  fi
+"$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop worker
+if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then
+  "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop pullserver
 fi
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-dist/src/main/bin/tajo
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo
index cc611d5..a92d1a9 100755
--- a/tajo-dist/src/main/bin/tajo
+++ b/tajo-dist/src/main/bin/tajo
@@ -390,15 +390,6 @@ elif [ "$COMMAND" = "worker" ] ; then
 elif [ "$COMMAND" = "pullserver" ] ; then
   CLASS='org.apache.tajo.pullserver.TajoPullServer'
   TAJO_OPTS="$TAJO_OPTS $JAVA_PULLSERVER_HEAP_MAX $TAJO_PULLSERVER_OPTS"
-elif [ "$COMMAND" = "querymaster" ] ; then
-  CLASS='org.apache.tajo.worker.TajoWorker'
-  TAJO_OPTS="$TAJO_OPTS $JAVA_QUERYMASTER_HEAP_MAX $TAJO_QUERYMASTER_OPTS"
-  TAJO_DAEMON_MODE='standby-qm'
-elif [ "$COMMAND" = "taskrunner" ] ; then
-  CLASS='org.apache.tajo.worker.TajoWorker'
-  TAJO_OPTS="$TAJO_OPTS $TAJO_WORKER_OPTS"
-  TAJO_OPTS="$TAJO_OPTS $JAVA_WORKER_HEAP_MAX $TAJO_WORKER_OPTS"
-  TAJO_DAEMON_MODE='standby-tr'
 elif [ "$COMMAND" = "catalog" ] ; then
   CLASS='org.apache.tajo.catalog.CatalogServer'
   TAJO_OPTS="$TAJO_OPTS $TAJO_CATALOG_OPTS"

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-dist/src/main/conf/tajo-env.sh
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/conf/tajo-env.sh b/tajo-dist/src/main/conf/tajo-env.sh
index cbb881e..d52a827 100755
--- a/tajo-dist/src/main/conf/tajo-env.sh
+++ b/tajo-dist/src/main/conf/tajo-env.sh
@@ -72,9 +72,6 @@
 # The scheduling priority for daemon processes.  See 'man nice'.
 # export TAJO_NICENESS=10
 
-# Tajo cluster mode. the default mode is standby mode.
-export TAJO_WORKER_STANDBY_MODE=true
-
 # It must be required to use HCatalogStore
 # export HIVE_HOME=
 # export HIVE_JDBC_DRIVER_DIR=

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
deleted file mode 100644
index fba0593..0000000
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java
+++ /dev/null
@@ -1,655 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.pullserver;
-
-import com.google.common.collect.Lists;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.*;
-import io.netty.channel.group.ChannelGroup;
-import io.netty.channel.group.DefaultChannelGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.codec.http.*;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.stream.ChunkedWriteHandler;
-import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.GenericFutureListener;
-import io.netty.util.concurrent.GlobalEventExecutor;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
-import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.pullserver.retriever.FileChunk;
-import org.apache.tajo.rpc.RpcChannelFactory;
-import org.apache.tajo.storage.RowStoreUtil;
-import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.TupleComparator;
-import org.apache.tajo.storage.index.bst.BSTIndex;
-import org.apache.tajo.util.TajoIdUtils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class PullServerAuxService extends AuxiliaryService {
-
-  private static final Log LOG = LogFactory.getLog(PullServerAuxService.class);
-  
-  public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache";
-  public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
-
-  public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes";
-  public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024;
-
-  private int port;
-  private ServerBootstrap selector;
-  private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
-  private HttpChannelInitializer initializer;
-  private int sslFileBufferSize;
-
-  private ApplicationId appId;
-  private QueryId queryId;
-  private FileSystem localFS;
-
-  /**
-   * Should the shuffle use posix_fadvise calls to manage the OS cache during
-   * sendfile
-   */
-  private boolean manageOsCache;
-  private int readaheadLength;
-  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
-   
-
-  public static final String PULLSERVER_SERVICEID = "tajo.pullserver";
-
-  private static final Map<String,String> userRsrc =
-    new ConcurrentHashMap<String,String>();
-  private static String userName;
-
-  public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY =
-    "tajo.pullserver.ssl.file.buffer.size";
-
-  public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024;
-
-  @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo")
-  static class ShuffleMetrics implements GenericFutureListener<ChannelFuture> {
-    @Metric({"OutputBytes","PullServer output in bytes"})
-    MutableCounterLong shuffleOutputBytes;
-    @Metric({"Failed","# of failed shuffle outputs"})
-    MutableCounterInt shuffleOutputsFailed;
-    @Metric({"Succeeded","# of succeeded shuffle outputs"})
-    MutableCounterInt shuffleOutputsOK;
-    @Metric({"Connections","# of current shuffle connections"})
-    MutableGaugeInt shuffleConnections;
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      if (future.isSuccess()) {
-        shuffleOutputsOK.incr();
-      } else {
-        shuffleOutputsFailed.incr();
-      }
-      shuffleConnections.decr();
-    }
-  }
-
-  final ShuffleMetrics metrics;
-
-  PullServerAuxService(MetricsSystem ms) {
-    super("httpshuffle");
-    metrics = ms.register(new ShuffleMetrics());
-  }
-
-  @SuppressWarnings("UnusedDeclaration")
-  public PullServerAuxService() {
-    this(DefaultMetricsSystem.instance());
-  }
-
-  /**
-   * Serialize the shuffle port into a ByteBuffer for use later on.
-   * @param port the port to be sent to the ApplciationMaster
-   * @return the serialized form of the port.
-   */
-  public static ByteBuffer serializeMetaData(int port) throws IOException {
-    //TODO these bytes should be versioned
-    DataOutputBuffer port_dob = new DataOutputBuffer();
-    port_dob.writeInt(port);
-    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
-  }
-
-  /**
-   * A helper function to deserialize the metadata returned by PullServerAuxService.
-   * @param meta the metadata returned by the PullServerAuxService
-   * @return the port the PullServer Handler is listening on to serve shuffle data.
-   */
-  public static int deserializeMetaData(ByteBuffer meta) throws IOException {
-    //TODO this should be returning a class not just an int
-    DataInputByteBuffer in = new DataInputByteBuffer();
-    in.reset(meta);
-    return in.readInt();
-  }
-
-  @Override
-  public void initializeApplication(ApplicationInitializationContext appInitContext) {
-    // TODO these bytes should be versioned
-    // TODO: Once SHuffle is out of NM, this can use MR APIs
-    this.appId = appInitContext.getApplicationId();
-    this.queryId = TajoIdUtils.parseQueryId(appId.toString());
-    this.userName = appInitContext.getUser();
-    userRsrc.put(this.appId.toString(), this.userName);
-  }
-
-  @Override
-  public void stopApplication(ApplicationTerminationContext appStopContext) {
-    userRsrc.remove(appStopContext.getApplicationId().toString());
-  }
-
-  @Override
-  public synchronized void init(Configuration conf) {
-    try {
-      manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
-          DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
-      readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
-          DEFAULT_SHUFFLE_READAHEAD_BYTES);
-
-      selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", 0)
-                  .option(ChannelOption.TCP_NODELAY, true)
-                  .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-                  .childOption(ChannelOption.TCP_NODELAY, true);
-
-      localFS = new LocalFileSystem();
-      super.init(new Configuration(conf));
-    } catch (Throwable t) {
-      LOG.error(t, t);
-    }
-  }
-
-  // TODO change AbstractService to throw InterruptedException
-  @Override
-  public synchronized void start() {
-    Configuration conf = getConfig();
-    ServerBootstrap bootstrap = selector.clone();
-    try {
-      initializer = new HttpChannelInitializer(conf);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
-    }
-    bootstrap.channel(NioServerSocketChannel.class)
-            .handler(initializer);
-    port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
-        ConfVars.PULLSERVER_PORT.defaultIntVal);
-    ChannelFuture future = bootstrap.bind(new InetSocketAddress(port))
-            .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE)
-            .syncUninterruptibly();
-    accepted.add(future.channel());
-    port = ((InetSocketAddress)future.channel().localAddress()).getPort();
-    conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port));
-    initializer.PullServer.setPort(port);
-    LOG.info(getName() + " listening on port " + port);
-    super.start();
-
-    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
-                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
-  }
-
-  public int getPort() {
-    return port;
-  }
-
-  @Override
-  public synchronized void stop() {
-    try {
-      accepted.close();
-      if (selector != null) {
-        if (selector.group() != null) {
-          selector.group().shutdownGracefully();
-        }
-        if (selector.childGroup() != null) {
-          selector.childGroup().shutdownGracefully();
-        }
-      }
-
-      if (initializer != null) {
-        initializer.destroy();
-      }
-
-      localFS.close();
-    } catch (Throwable t) {
-      LOG.error(t, t);
-    } finally {
-      super.stop();
-    }
-  }
-
-  @Override
-  public synchronized ByteBuffer getMetaData() {
-    try {
-      return serializeMetaData(port); 
-    } catch (IOException e) {
-      LOG.error("Error during getMeta", e);
-      // TODO add API to AuxiliaryServices to report failures
-      return null;
-    }
-  }
-
-  class HttpChannelInitializer extends ChannelInitializer<Channel> {
-
-    final PullServer PullServer;
-    private SSLFactory sslFactory;
-
-    public HttpChannelInitializer(Configuration conf) throws Exception {
-      PullServer = new PullServer(conf);
-      if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname,
-          ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) {
-        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-        sslFactory.init();
-      }
-    }
-
-    public void destroy() {
-      if (sslFactory != null) {
-        sslFactory.destroy();
-      }
-    }
-
-    @Override
-    protected void initChannel(Channel channel) throws Exception {
-      ChannelPipeline pipeline = channel.pipeline();
-      if (sslFactory != null) {
-        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
-      }
-
-      pipeline.addLast("encoder", new HttpResponseEncoder());
-      pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
-      pipeline.addLast("chunking", new ChunkedWriteHandler());
-      pipeline.addLast("shuffle", PullServer);
-      // TODO factor security manager into pipeline
-      // TODO factor out encode/decode to permit binary shuffle
-      // TODO factor out decode of index to permit alt. models
-    }
-  }
-
-  @ChannelHandler.Sharable
-  class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> {
-    private final Configuration conf;
-    private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-    private int port;
-
-    public PullServer(Configuration conf) {
-      this.conf = conf;
-      this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal);
-    }
-    
-    public void setPort(int port) {
-      this.port = port;
-    }
-
-    private List<String> splitMaps(List<String> mapq) {
-      if (null == mapq) {
-        return null;
-      }
-      final List<String> ret = new ArrayList<String>();
-      for (String s : mapq) {
-        Collections.addAll(ret, s.split(","));
-      }
-      return ret;
-    }
-
-    @Override
-    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request)
-        throws Exception {
-      if (request.getMethod() != HttpMethod.GET) {
-        sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
-        return;
-      }
-
-      // Parsing the URL into key-values
-      final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters();
-      final List<String> types = params.get("type");
-      final List<String> taskIdList = params.get("ta");
-      final List<String> stageIds = params.get("sid");
-      final List<String> partitionIds = params.get("p");
-
-      if (types == null || taskIdList == null || stageIds == null || partitionIds == null) {
-        sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST);
-        return;
-      }
-
-      if (types.size() != 1 || stageIds.size() != 1) {
-        sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST);
-        return;
-      }
-
-      final List<FileChunk> chunks = Lists.newArrayList();
-
-      String repartitionType = types.get(0);
-      String sid = stageIds.get(0);
-      String partitionId = partitionIds.get(0);
-      List<String> taskIds = splitMaps(taskIdList);
-
-      // the working dir of tajo worker for each query
-      String queryBaseDir = queryId + "/output" + "/";
-
-      LOG.info("PullServer request param: repartitionType=" + repartitionType + ", sid=" + sid + ", partitionId="
-          + partitionId + ", taskIds=" + taskIdList);
-
-      String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname);
-      if (taskLocalDir == null || taskLocalDir.equals("")) {
-        LOG.error("Tajo local directory should be specified.");
-      }
-      LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir);
-
-      // if a stage requires a range partitioning
-      if (repartitionType.equals("r")) {
-        String ta = taskIds.get(0);
-        Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
-            + "/output/", conf));
-
-        String startKey = params.get("start").get(0);
-        String endKey = params.get("end").get(0);
-        boolean last = params.get("final") != null;
-
-        FileChunk chunk;
-        try {
-          chunk = getFileCunks(path, startKey, endKey, last);
-        } catch (Throwable t) {
-          LOG.error("ERROR Request: " + request.getUri(), t);
-          sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST);
-          return;
-        }
-        if (chunk != null) {
-          chunks.add(chunk);
-        }
-
-        // if a stage requires a hash repartition or a scattered hash
-        // repartition
-      } else if (repartitionType.equals("h") || repartitionType.equals("s")) {
-        for (String ta : taskIds) {
-          Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta
-              + "/output/" + partitionId, conf));
-          File file = new File(path.toUri());
-          FileChunk chunk = new FileChunk(file, 0, file.length());
-          chunks.add(chunk);
-        }
-      } else {
-        LOG.error("Unknown repartition type: " + repartitionType);
-        return;
-      }
-
-      // Write the content.
-      if (chunks.size() == 0) {
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT);
-
-        if (!HttpHeaders.isKeepAlive(request)) {
-          ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
-        } else {
-          response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-          ctx.writeAndFlush(response);
-        }
-      } else {
-        FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]);
-        ChannelFuture writeFuture = null;
-        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
-        long totalSize = 0;
-        for (FileChunk chunk : file) {
-          totalSize += chunk.length();
-        }
-        HttpHeaders.setContentLength(response, totalSize);
-
-        if (HttpHeaders.isKeepAlive(request)) {
-          response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-        }
-        // Write the initial line and the header.
-        writeFuture = ctx.write(response);
-
-        for (FileChunk chunk : file) {
-          writeFuture = sendFile(ctx, chunk);
-          if (writeFuture == null) {
-            sendError(ctx, HttpResponseStatus.NOT_FOUND);
-            return;
-          }
-        }
-        if (ctx.pipeline().get(SslHandler.class) == null) {
-          writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-        } else {
-          ctx.flush();
-        }
-
-        // Decide whether to close the connection or not.
-        if (!HttpHeaders.isKeepAlive(request)) {
-          // Close the connection when the whole content is written out.
-          writeFuture.addListener(ChannelFutureListener.CLOSE);
-        }
-      }
-
-    }
-
-    private ChannelFuture sendFile(ChannelHandlerContext ctx,
-                                   FileChunk file) throws IOException {
-      RandomAccessFile spill;
-      try {
-        spill = new RandomAccessFile(file.getFile(), "r");
-      } catch (FileNotFoundException e) {
-        LOG.info(file.getFile() + " not found");
-        return null;
-      }
-
-      ChannelFuture lastContentFuture;
-      if (ctx.pipeline().get(SslHandler.class) == null) {
-        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
-            file.startOffset(), file.length(), manageOsCache, readaheadLength,
-            readaheadPool, file.getFile().getAbsolutePath());
-        lastContentFuture = ctx.write(partition);
-        lastContentFuture.addListener(new FileCloseListener(partition, null, 0, null));
-      } else {
-        // HTTPS cannot be done with zero copy.
-        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
-            file.startOffset(), file.length(), sslFileBufferSize,
-            manageOsCache, readaheadLength, readaheadPool,
-            file.getFile().getAbsolutePath());
-        lastContentFuture = ctx.write(new HttpChunkedInput(chunk));
-      }
-      metrics.shuffleConnections.incr();
-      metrics.shuffleOutputBytes.incr(file.length()); // optimistic
-      return lastContentFuture;
-    }
-    
-    private void sendError(ChannelHandlerContext ctx,
-        HttpResponseStatus status) {
-      sendError(ctx, "", status);
-    }
-
-    private void sendError(ChannelHandlerContext ctx, String message,
-        HttpResponseStatus status) {
-      FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status,
-              Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
-      response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
-
-      // Close the connection as soon as the error message is sent.
-      ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception {
-      Channel ch = ctx.channel();
-      if (cause instanceof TooLongFrameException) {
-        sendError(ctx, HttpResponseStatus.BAD_REQUEST);
-        return;
-      }
-
-      LOG.error("PullServer error: ", cause);
-      if (ch.isActive()) {
-        sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
-      }
-    }
-  }
-
-  public FileChunk getFileCunks(Path outDir,
-                                      String startKey,
-                                      String endKey,
-                                      boolean last) throws IOException {
-    BSTIndex index = new BSTIndex(new TajoConf());
-    BSTIndex.BSTIndexReader idxReader =
-        index.getIndexReader(new Path(outDir, "index"));
-    idxReader.open();
-    Schema keySchema = idxReader.getKeySchema();
-    TupleComparator comparator = idxReader.getComparator();
-
-    LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", "
-        + idxReader.getLastKey());
-
-    File data = new File(URI.create(outDir.toUri() + "/output"));
-    byte [] startBytes = Base64.decodeBase64(startKey);
-    byte [] endBytes = Base64.decodeBase64(endKey);
-
-    RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema);
-    Tuple start;
-    Tuple end;
-    try {
-      start = decoder.toTuple(startBytes);
-    } catch (Throwable t) {
-      throw new IllegalArgumentException("StartKey: " + startKey
-          + ", decoded byte size: " + startBytes.length, t);
-    }
-
-    try {
-      end = decoder.toTuple(endBytes);
-    } catch (Throwable t) {
-      throw new IllegalArgumentException("EndKey: " + endKey
-          + ", decoded byte size: " + endBytes.length, t);
-    }
-
-
-    if(!comparator.isAscendingFirstKey()) {
-      Tuple tmpKey = start;
-      start = end;
-      end = tmpKey;
-    }
-
-    LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end +
-        (last ? ", last=true" : "") + ")");
-
-    if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero
-      LOG.info("There is no contents");
-      return null;
-    }
-
-    if (comparator.compare(end, idxReader.getFirstKey()) < 0 ||
-        comparator.compare(idxReader.getLastKey(), start) < 0) {
-      LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() +
-          "], but request start:" + start + ", end: " + end);
-      return null;
-    }
-
-    long startOffset;
-    long endOffset;
-    try {
-      startOffset = idxReader.find(start);
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: "
-          + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
-    try {
-      endOffset = idxReader.find(end);
-      if (endOffset == -1) {
-        endOffset = idxReader.find(end, true);
-      }
-    } catch (IOException ioe) {
-      LOG.error("State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: "
-          + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-      throw ioe;
-    }
-
-    // if startOffset == -1 then case 2-1 or case 3
-    if (startOffset == -1) { // this is a hack
-      // if case 2-1 or case 3
-      try {
-        startOffset = idxReader.find(start, true);
-      } catch (IOException ioe) {
-        LOG.error("State Dump (the requested range: "
-            + "[" + start + ", " + end +")" + ", idx min: "
-            + idxReader.getFirstKey() + ", idx max: "
-            + idxReader.getLastKey());
-        throw ioe;
-      }
-    }
-
-    if (startOffset == -1) {
-      throw new IllegalStateException("startOffset " + startOffset + " is negative \n" +
-          "State Dump (the requested range: "
-          + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: "
-          + idxReader.getLastKey());
-    }
-
-    // if greater than indexed values
-    if (last || (endOffset == -1
-        && comparator.compare(idxReader.getLastKey(), end) < 0)) {
-      endOffset = data.length();
-    }
-
-    FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset);
-    LOG.info("Retrieve File Chunk: " + chunk);
-    return chunk;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
----------------------------------------------------------------------
diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
index d030eed..4609712 100644
--- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
+++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.pullserver.PullServerAuxService.PullServer;
 import org.apache.tajo.util.StringUtils;
 
 public class TajoPullServer extends CompositeService {
@@ -40,6 +39,9 @@ public class TajoPullServer extends CompositeService {
 
   @Override
   public void init(Configuration conf) {
+    if (!(conf instanceof TajoConf)) {
+      throw new IllegalArgumentException("Configuration must be a TajoConf instance");
+    }
     this.systemConf = (TajoConf)conf;
     pullService = new TajoPullServerService();
     addService(pullService);
@@ -58,7 +60,7 @@ public class TajoPullServer extends CompositeService {
   }
 
   public static void main(String[] args) throws Exception {
-    StringUtils.startupShutdownMessage(PullServer.class, args, LOG);
+    StringUtils.startupShutdownMessage(TajoPullServerService.PullServer.class, args, LOG);
 
     if (!TajoPullServerService.isStandalone()) {
       LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'");


Mime
View raw message