Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CD55B1783A for ; Wed, 1 Apr 2015 06:38:50 +0000 (UTC) Received: (qmail 11072 invoked by uid 500); 1 Apr 2015 06:38:50 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 11036 invoked by uid 500); 1 Apr 2015 06:38:50 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 10962 invoked by uid 99); 1 Apr 2015 06:38:50 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Apr 2015 06:38:50 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7114FE0FB2; Wed, 1 Apr 2015 06:38:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhkim@apache.org To: commits@tajo.apache.org Date: Wed, 01 Apr 2015 06:38:51 -0000 Message-Id: In-Reply-To: <34451cc2d08648f1a269eccd785ceae4@git.apache.org> References: <34451cc2d08648f1a269eccd785ceae4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] tajo git commit: TAJO-1482: Cleanup the legacy cluster mode. (jinho) TAJO-1482: Cleanup the legacy cluster mode. (jinho) Closes #484 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/968633ff Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/968633ff Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/968633ff Branch: refs/heads/master Commit: 968633ff06b29101586286980a2099fc3dd4c534 Parents: 487a0e5 Author: Jinho Kim Authored: Wed Apr 1 15:37:56 2015 +0900 Committer: Jinho Kim Committed: Wed Apr 1 15:37:56 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../org/apache/tajo/cli/tools/TajoAdmin.java | 35 +- tajo-client/src/main/proto/ClientProtos.proto | 4 +- .../java/org/apache/tajo/conf/TajoConf.java | 4 +- .../tajo/master/TajoMasterClientService.java | 2 - .../NonForwardQueryResultSystemScanner.java | 47 +- .../tajo/master/rm/TajoResourceTracker.java | 10 +- .../java/org/apache/tajo/master/rm/Worker.java | 4 +- .../apache/tajo/master/rm/WorkerResource.java | 20 - .../apache/tajo/querymaster/QueryMaster.java | 8 +- .../java/org/apache/tajo/worker/TajoWorker.java | 81 +-- .../apache/tajo/worker/TaskRunnerManager.java | 11 +- .../tajo/worker/WorkerHeartbeatService.java | 9 +- .../main/proto/QueryCoordinatorProtocol.proto | 2 - tajo-core/src/main/resources/tajo-default.xml | 12 - .../main/resources/webapps/admin/cluster.jsp | 19 +- .../src/main/resources/webapps/admin/index.jsp | 26 +- .../src/main/resources/webapps/worker/index.jsp | 6 - .../org/apache/tajo/MiniTajoYarnCluster.java | 175 ----- .../org/apache/tajo/TajoTestingCluster.java | 62 +- .../tajo/master/rm/TestTajoResourceManager.java | 14 +- tajo-dist/src/main/bin/start-tajo.sh | 13 +- tajo-dist/src/main/bin/stop-tajo.sh | 11 +- tajo-dist/src/main/bin/tajo | 9 - tajo-dist/src/main/conf/tajo-env.sh | 3 - .../tajo/pullserver/PullServerAuxService.java | 655 ------------------- .../apache/tajo/pullserver/TajoPullServer.java | 6 +- .../tajo/pullserver/TajoPullServerService.java | 115 ++-- .../retriever/DirectoryRetriever.java | 2 +- 29 files changed, 123 insertions(+), 1244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 4e1a27c..12bf84e 100644 --- a/CHANGES +++ b/CHANGES @@ -107,6 +107,8 @@ Release 0.11.0 - unreleased TASKS + TAJO-1482: Cleanup the legacy cluster mode. (jinho) + TAJO-1439: Some method name is written wrongly. (Contributed by Jongyoung Park. Committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java index 98ccc5f..4f56649 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoAdmin.java @@ -23,14 +23,16 @@ import org.apache.commons.cli.*; import org.apache.commons.lang.StringUtils; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; -import org.apache.tajo.client.*; +import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; +import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo; import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo; -import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.util.NetUtils; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.TajoIdUtils; import java.io.IOException; @@ -230,24 +232,15 @@ public class TajoAdmin { List deadQueryMasters = new ArrayList(); for (WorkerResourceInfo eachWorker : workerList) { - if(eachWorker.getQueryMasterMode() == true) { - if(eachWorker.getWorkerStatus().equals(WorkerStatus.RUNNING.toString())) { - liveQueryMasters.add(eachWorker); - runningQueryMasterTasks += eachWorker.getNumQueryMasterTasks(); - } - if(eachWorker.getWorkerStatus().equals(WorkerStatus.LOST.toString())) { - deadQueryMasters.add(eachWorker); - } - } - - if(eachWorker.getTaskRunnerMode() == true) { - if(eachWorker.getWorkerStatus().equals(WorkerStatus.RUNNING.toString())) { - liveWorkers.add(eachWorker); - } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.LOST.toString())) { - deadWorkers.add(eachWorker); - } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.DECOMMISSIONED.toString())) { - decommissionWorkers.add(eachWorker); - } + if(eachWorker.getWorkerStatus().equals(WorkerStatus.RUNNING.toString())) { + liveQueryMasters.add(eachWorker); + liveWorkers.add(eachWorker); + runningQueryMasterTasks += eachWorker.getNumQueryMasterTasks(); + } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.LOST.toString())) { + deadQueryMasters.add(eachWorker); + deadWorkers.add(eachWorker); + } else if(eachWorker.getWorkerStatus().equals(WorkerStatus.DECOMMISSIONED.toString())) { + decommissionWorkers.add(eachWorker); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-client/src/main/proto/ClientProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 4c359a2..ecb136e 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -189,9 +189,7 @@ message WorkerResourceInfo { required int32 numRunningTasks = 11; required string workerStatus = 12; required int64 lastHeartbeat = 13; - required bool queryMasterMode = 14; - required bool taskRunnerMode = 15; - required int32 numQueryMasterTasks = 16; + required int32 numQueryMasterTasks = 14; } message GetClusterInfoResponse { http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index ecdb2ef..53773d8 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -192,11 +192,9 @@ public class TajoConf extends Configuration { // for Yarn Resource Manager ---------------------------------------------- /** how many launching TaskRunners in parallel */ - YARN_RM_QUERY_MASTER_MEMORY_MB("tajo.querymaster.memory-mb", 512, Validators.min("64")), - YARN_RM_QUERY_MASTER_DISKS("tajo.yarn-rm.querymaster.disks", 1), + @Deprecated YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", Runtime.getRuntime().availableProcessors() * 2), - YARN_RM_WORKER_NUMBER_PER_NODE("tajo.yarn-rm.max-worker-num-per-node", 8), // Query Configuration QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")), http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 418c30b..4fcdc88 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -661,8 +661,6 @@ public class TajoMasterClientService extends AbstractService { workerBuilder.setUsedCpuCoreSlots(workerResource.getUsedCpuCoreSlots()); workerBuilder.setUsedDiskSlots(workerResource.getUsedDiskSlots()); workerBuilder.setWorkerStatus(worker.getState().toString()); - workerBuilder.setQueryMasterMode(workerResource.isQueryMasterMode()); - workerBuilder.setTaskRunnerMode(workerResource.isTaskRunnerMode()); workerBuilder.setMaxHeap(workerResource.getMaxHeap()); workerBuilder.setFreeHeap(workerResource.getFreeHeap()); workerBuilder.setTotalHeap(workerResource.getTotalHeap()); http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index c2ccf34..5901aa7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -18,33 +18,14 @@ package org.apache.tajo.master.exec; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.Stack; - +import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; -import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; -import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; -import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.conf.TajoConf; @@ -69,14 +50,15 @@ import org.apache.tajo.plan.logical.IndexScanNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; -import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; -import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.*; public class NonForwardQueryResultSystemScanner implements NonForwardQueryResultScanner { @@ -539,22 +521,13 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult private List getClusterInfo(Schema outSchema) { Map workerMap = masterContext.getResourceManager().getWorkers(); - Set keySet = workerMap.keySet(); - List tuples = Collections.emptyList(); + List tuples; List queryMasterList = new ArrayList(); List workerList = new ArrayList(); - for (Integer keyId: keySet) { - Worker aWorker = workerMap.get(keyId); - WorkerResource aResource = aWorker.getResource(); - - if (aResource.isQueryMasterMode()) { - queryMasterList.add(aWorker); - } - - if (aResource.isTaskRunnerMode()) { - workerList.add(aWorker); - } + for (Worker aWorker: workerMap.values()) { + queryMasterList.add(aWorker); + workerList.add(aWorker); } tuples = new ArrayList(queryMasterList.size() + workerList.size()); http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index 920fd39..334dbf6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -18,7 +18,6 @@ package org.apache.tajo.master.rm; -import com.google.common.base.Preconditions; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import org.apache.commons.logging.Log; @@ -76,7 +75,9 @@ public class TajoResourceTracker extends AbstractService implements TajoResource @Override public void serviceInit(Configuration conf) { - Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance"); + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } TajoConf systemConf = (TajoConf) conf; String confMasterServiceAddr = systemConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS); @@ -180,12 +181,7 @@ public class TajoResourceTracker extends AbstractService implements TajoResource } private Worker createWorkerResource(NodeHeartbeat request) { - boolean queryMasterMode = request.getServerStatus().getQueryMasterMode().getValue(); - boolean taskRunnerMode = request.getServerStatus().getTaskRunnerMode().getValue(); - WorkerResource workerResource = new WorkerResource(); - workerResource.setQueryMasterMode(queryMasterMode); - workerResource.setTaskRunnerMode(taskRunnerMode); if(request.getServerStatus() != null) { workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB()); http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java index d426e80..a2ab598 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/Worker.java @@ -196,9 +196,7 @@ public class Worker implements EventHandler, Comparable { @Override public void transition(Worker worker, WorkerEvent workerEvent) { - if(worker.getResource().isQueryMasterMode()) { - worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId()); - } + worker.rmContext.getQueryMasterWorker().add(worker.getWorkerId()); LOG.info("Worker with " + worker.getResource() + " is joined to Tajo cluster"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java index bfe186c..5f2d33c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResource.java @@ -57,10 +57,6 @@ public class WorkerResource { private final Lock rlock = lock.readLock(); private final Lock wlock = lock.writeLock(); - private boolean queryMasterMode; - - private boolean taskRunnerMode; - private AtomicInteger numQueryMasterTasks = new AtomicInteger(0); public float getDiskSlots() { @@ -145,22 +141,6 @@ public class WorkerResource { return usedDiskSlots; } - public boolean isQueryMasterMode() { - return queryMasterMode; - } - - public void setQueryMasterMode(boolean queryMasterMode) { - this.queryMasterMode = queryMasterMode; - } - - public boolean isTaskRunnerMode() { - return taskRunnerMode; - } - - public void setTaskRunnerMode(boolean taskRunnerMode) { - this.taskRunnerMode = taskRunnerMode; - } - public void releaseResource(float diskSlots, int memoryMB) { try { wlock.lock(); http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index bf23133..9cbfb95 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -167,10 +167,7 @@ public class QueryMaster extends CompositeService implements EventHandler { super.stop(); - LOG.info("QueryMaster stop"); - if(queryMasterContext.getWorkerContext().isYarnContainerMode()) { - queryMasterContext.getWorkerContext().stopWorker(true); - } + LOG.info("QueryMaster stopped"); } protected void cleanupExecutionBlock(List executionBlockIds) { @@ -379,9 +376,6 @@ public class QueryMaster extends CompositeService implements EventHandler { } } } - if(workerContext.isYarnContainerMode()) { - stop(); - } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 3c55add..1d0293b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -75,12 +75,6 @@ public class TajoWorker extends CompositeService { public static final PrimitiveProtos.BoolProto TRUE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(true).build(); public static final PrimitiveProtos.BoolProto FALSE_PROTO = PrimitiveProtos.BoolProto.newBuilder().setValue(false).build(); - public static final String WORKER_MODE_YARN_TASKRUNNER = "tr"; - public static final String WORKER_MODE_YARN_QUERYMASTER = "qm"; - public static final String WORKER_MODE_STANDBY = "standby"; - public static final String WORKER_MODE_QUERY_MASTER = "standby-qm"; - public static final String WORKER_MODE_TASKRUNNER = "standby-tr"; - private static final Log LOG = LogFactory.getLog(TajoWorker.class); private TajoConf systemConf; @@ -103,15 +97,6 @@ public class TajoWorker extends CompositeService { private TajoPullServerService pullService; - @Deprecated - private boolean yarnContainerMode; - - @Deprecated - private boolean queryMasterMode; - - @Deprecated - private boolean taskRunnerMode; - private ServiceTracker serviceTracker; private WorkerHeartbeatService workerHeartbeatThread; @@ -151,39 +136,11 @@ public class TajoWorker extends CompositeService { public void startWorker(TajoConf systemConf, String[] args) { this.systemConf = systemConf; this.cmdArgs = args; - setWorkerMode(args); init(systemConf); start(); } - private void setWorkerMode(String[] args) { - if(args.length < 1) { - queryMasterMode = systemConf.getBoolean("tajo.worker.mode.querymaster", true); - taskRunnerMode = systemConf.getBoolean("tajo.worker.mode.taskrunner", true); - } else { - if(WORKER_MODE_STANDBY.equals(args[0])) { - queryMasterMode = true; - taskRunnerMode = true; - } else if(WORKER_MODE_YARN_TASKRUNNER.equals(args[0])) { - yarnContainerMode = true; - queryMasterMode = true; - } else if(WORKER_MODE_YARN_QUERYMASTER.equals(args[0])) { - yarnContainerMode = true; - taskRunnerMode = true; - } else if(WORKER_MODE_QUERY_MASTER.equals(args[0])) { - yarnContainerMode = false; - queryMasterMode = true; - } else { - yarnContainerMode = false; - taskRunnerMode = true; - } - } - if(!queryMasterMode && !taskRunnerMode) { - LOG.fatal("Worker daemon exit cause no worker mode(querymaster/taskrunner) property"); - System.exit(0); - } - } - + @Override public void serviceInit(Configuration conf) throws Exception { if (!(conf instanceof TajoConf)) { @@ -205,6 +162,7 @@ public class TajoWorker extends CompositeService { if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) { randomPort = false; } + int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort(); int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort(); int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort(); @@ -237,7 +195,7 @@ public class TajoWorker extends CompositeService { addIfService(workerHeartbeatThread); int httpPort = 0; - if(taskRunnerMode && !TajoPullServerService.isStandalone()) { + if(!TajoPullServerService.isStandalone()) { pullService = new TajoPullServerService(); addIfService(pullService); } @@ -263,8 +221,7 @@ public class TajoWorker extends CompositeService { queryMasterManagerService.getBindAddr().getPort(), httpPort); - LOG.info("Tajo Worker is initialized. \r\nQueryMaster=" + queryMasterMode + " TaskRunner=" + taskRunnerMode - + " connection :" + connectionInfo.toString()); + LOG.info("Tajo Worker is initialized." + " connection :" + connectionInfo.toString()); try { hashShuffleAppenderManager = new HashShuffleAppenderManager(systemConf); @@ -312,10 +269,6 @@ public class TajoWorker extends CompositeService { private int initWebServer() { int httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort(); try { - if (queryMasterMode && !taskRunnerMode) { - //If QueryMaster and TaskRunner run on single host, http port conflicts - httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort(); - } webServer = StaticHttpServer.getInstance(this, "worker", null, httpPort, true, null, systemConf, null); webServer.start(); @@ -457,18 +410,7 @@ public class TajoWorker extends CompositeService { } public String getWorkerName() { - if (queryMasterMode) { - return getQueryMasterManagerService().getHostAndPort(); - } else { - return connectionInfo.getHostAndPeerRpcPort(); - } - } - - public void stopWorker(boolean force) { - stop(); - if (force) { - System.exit(0); - } + return connectionInfo.getHostAndPeerRpcPort(); } public LocalDirAllocator getLocalDirAllocator(){ @@ -515,11 +457,6 @@ public class TajoWorker extends CompositeService { } } - @Deprecated - public boolean isYarnContainerMode() { - return yarnContainerMode; - } - public void setNumClusterNodes(int numClusterNodes) { TajoWorker.this.numClusterNodes.set(numClusterNodes); } @@ -536,14 +473,6 @@ public class TajoWorker extends CompositeService { } } - public boolean isQueryMasterMode() { - return queryMasterMode; - } - - public boolean isTaskRunnerMode() { - return taskRunnerMode; - } - public TajoSystemMetrics getWorkerSystemMetrics() { return workerSystemMetrics; } http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index a375a31..11a95ce 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -18,7 +18,6 @@ package org.apache.tajo.worker; -import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,7 +62,9 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< @Override public void init(Configuration conf) { - Preconditions.checkArgument(conf instanceof TajoConf); + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } tajoConf = (TajoConf)conf; dispatcher.register(TaskRunnerEvent.EventType.class, this); super.init(tajoConf); @@ -98,18 +99,12 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< } super.stop(); - if(workerContext.isYarnContainerMode()) { - workerContext.stopWorker(true); - } } public void stopTaskRunner(String id) { LOG.info("Stop Task:" + id); TaskRunner taskRunner = taskRunnerMap.remove(id); taskRunner.stop(); - if(workerContext.isYarnContainerMode()) { - stop(); - } } public Collection getTaskRunners() { http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index 5493b37..e9f90ca 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -18,7 +18,6 @@ package org.apache.tajo.worker; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; @@ -26,7 +25,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto; import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; @@ -34,7 +32,6 @@ import org.apache.tajo.ipc.TajoResourceTrackerProtocol; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.storage.DiskDeviceInfo; import org.apache.tajo.storage.DiskMountInfo; @@ -72,7 +69,9 @@ public class WorkerHeartbeatService extends AbstractService { @Override public void serviceInit(Configuration conf) throws Exception { - Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance."); + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } this.systemConf = (TajoConf) conf; connectionPool = RpcConnectionPool.getPool(); @@ -170,8 +169,6 @@ public class WorkerHeartbeatService extends AbstractService { .setDiskSlots(workerDiskSlots) .setMemoryResourceMB(workerMemoryMB) .setJvmHeap(jvmHeap) - .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isQueryMasterMode())) - .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isTaskRunnerMode())) .build(); NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder() http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto index cef385e..2440e2a 100644 --- a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto +++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto @@ -58,8 +58,6 @@ message ServerStatusProto { repeated Disk disk = 4; required int32 runningTaskNum = 5; required JvmHeap jvmHeap = 6; - required BoolProto queryMasterMode = 7; - required BoolProto taskRunnerMode = 8; } message TajoHeartbeat { http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/resources/tajo-default.xml ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/tajo-default.xml b/tajo-core/src/main/resources/tajo-default.xml index 4a92e72..d9c21bb 100644 --- a/tajo-core/src/main/resources/tajo-default.xml +++ b/tajo-core/src/main/resources/tajo-default.xml @@ -26,18 +26,6 @@ - tajo.worker.mode.querymaster - true - - - - - tajo.worker.mode.taskrunner - true - - - - tajo.querymaster.task-scheduler org.apache.tajo.querymaster.DefaultTaskScheduler http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/resources/webapps/admin/cluster.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/cluster.jsp b/tajo-core/src/main/resources/webapps/admin/cluster.jsp index aca1153..816a144 100644 --- a/tajo-core/src/main/resources/webapps/admin/cluster.jsp +++ b/tajo-core/src/main/resources/webapps/admin/cluster.jsp @@ -48,26 +48,17 @@ Set deadQueryMasters = new TreeSet(); for(Worker eachWorker: workers.values()) { - if(eachWorker.getResource().isQueryMasterMode()) { - liveQueryMasters.add(eachWorker); - runningQueryMasterTasks += eachWorker.getResource().getNumQueryMasterTasks(); - } - - if(eachWorker.getResource().isTaskRunnerMode()) { - liveWorkers.add(eachWorker); - } + liveQueryMasters.add(eachWorker); + liveWorkers.add(eachWorker); + runningQueryMasterTasks += eachWorker.getResource().getNumQueryMasterTasks(); } for (Worker inactiveWorker : master.getContext().getResourceManager().getInactiveWorkers().values()) { WorkerState state = inactiveWorker.getState(); if (state == WorkerState.LOST) { - if (inactiveWorker.getResource().isQueryMasterMode()) { - deadQueryMasters.add(inactiveWorker); - } - if (inactiveWorker.getResource().isTaskRunnerMode()) { - deadWorkers.add(inactiveWorker); - } + deadQueryMasters.add(inactiveWorker); + deadWorkers.add(inactiveWorker); } else if (state == WorkerState.DECOMMISSIONED) { decommissionWorkers.add(inactiveWorker); } http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index 0a0558e..468fc72 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -57,27 +57,19 @@ master.getContext().getResourceManager().getClusterResourceSummary(); for(Worker eachWorker: workers.values()) { - if(eachWorker.getResource().isQueryMasterMode()) { - numQueryMasters++; - numLiveQueryMasters++; - runningQueryMasterTask += eachWorker.getResource().getNumQueryMasterTasks(); - } - if(eachWorker.getResource().isTaskRunnerMode()) { - numWorkers++; - numLiveWorkers++; - } + numQueryMasters++; + numLiveQueryMasters++; + runningQueryMasterTask += eachWorker.getResource().getNumQueryMasterTasks(); + numWorkers++; + numLiveWorkers++; } for (Worker eachWorker : inactiveWorkers.values()) { if (eachWorker.getState() == WorkerState.LOST) { - if(eachWorker.getResource().isQueryMasterMode()) { - numQueryMasters++; - numDeadQueryMasters++; - } - if(eachWorker.getResource().isTaskRunnerMode()) { - numWorkers++; - numDeadWorkers++; - } + numQueryMasters++; + numDeadQueryMasters++; + numWorkers++; + numDeadWorkers++; } else if(eachWorker.getState() == WorkerState.DECOMMISSIONED) { numDecommissionWorkers++; } http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/main/resources/webapps/worker/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/index.jsp b/tajo-core/src/main/resources/webapps/worker/index.jsp index bb72f9e..bc3cb1e 100644 --- a/tajo-core/src/main/resources/webapps/worker/index.jsp +++ b/tajo-core/src/main/resources/webapps/worker/index.jsp @@ -59,7 +59,6 @@
<% -if(tajoWorker.getWorkerContext().isQueryMasterMode()) { List queryMasterTasks = JSPUtil.sortQueryMasterTask(tajoWorker.getWorkerContext() .getQueryMasterManagerService().getQueryMaster().getQueryMasterTasks(), true); @@ -122,8 +121,6 @@ if(tajoWorker.getWorkerContext().isQueryMasterMode()) {


<% -} // end of QueryMaster -if(tajoWorker.getWorkerContext().isTaskRunnerMode()) { List taskRunners = new ArrayList(tajoWorker.getWorkerContext().getTaskRunnerManager().getTaskRunners()); JSPUtil.sortTaskRunner(taskRunners); %> @@ -145,9 +142,6 @@ if(tajoWorker.getWorkerContext().isTaskRunnerMode()) { } //end of for %> -<% -} //end of if -%> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java deleted file mode 100644 index 88d913d..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.LocalContainerLauncher; -import org.apache.hadoop.mapreduce.MRConfig; -import org.apache.hadoop.mapreduce.MRJobConfig; -import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; -import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.service.Service; -import org.apache.hadoop.util.JarFinder; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.server.MiniYARNCluster; -import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.pullserver.PullServerAuxService; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; - -/** - * Configures and starts the Tajo-specific components in the YARN cluster. - * - */ -public class MiniTajoYarnCluster extends MiniYARNCluster { - - public static final String APPJAR = JarFinder - .getJar(LocalContainerLauncher.class); - - private static final Log LOG = LogFactory.getLog(MiniTajoYarnCluster.class); - - public MiniTajoYarnCluster(String testName) { - this(testName, 1); - } - - public MiniTajoYarnCluster(String testName, int noOfNMs) { - super(testName, noOfNMs, 1, 1); - } - - @Override - public void init(Configuration conf) { - - conf.setSocketAddr(YarnConfiguration.RM_ADDRESS, new InetSocketAddress("127.0.0.1", 0)); - conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, new InetSocketAddress("127.0.0.1", 0)); - - conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME); - if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) { - conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(), - "apps_staging_dir/").getAbsolutePath()); - } - conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "000"); - - try { - Path stagingPath = FileContext.getFileContext(conf).makeQualified( - new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR))); - FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf); - if (fc.util().exists(stagingPath)) { - LOG.info(stagingPath + " exists! deleting..."); - fc.delete(stagingPath, true); - } - LOG.info("mkdir: " + stagingPath); - //mkdir the staging directory so that right permissions are set while running as proxy user - fc.mkdir(stagingPath, null, true); - //mkdir done directory as well - String doneDir = JobHistoryUtils - .getConfiguredHistoryServerDoneDirPrefix(conf); - Path doneDirPath = fc.makeQualified(new Path(doneDir)); - fc.mkdir(doneDirPath, null, true); - } catch (IOException e) { - throw new YarnRuntimeException("Could not create staging directory. ", e); - } - conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of - // which shuffle doesn't happen - //configure the shuffle service in NM - conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, PullServerAuxService.PULLSERVER_SERVICEID); - conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT, - PullServerAuxService.PULLSERVER_SERVICEID), PullServerAuxService.class, - Service.class); - - // Non-standard shuffle port - conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.name(), 0); - - // local directory - conf.set(TajoConf.ConfVars.WORKER_TEMPORAL_DIR.name(), "/tmp/tajo-localdir"); - - conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR, - DefaultContainerExecutor.class, ContainerExecutor.class); - - // TestMRJobs is for testing non-uberized operation only; see TestUberAM - // for corresponding uberized tests. - conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); - - conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 600); - - super.init(conf); - } - - @Override - public void start() { - super.start(); - - LOG.info("MiniTajoYarn NM Local Dir: " + getConfig().get(YarnConfiguration.NM_LOCAL_DIRS)); - } - - private class JobHistoryServerWrapper extends AbstractService { - public JobHistoryServerWrapper() { - super(JobHistoryServerWrapper.class.getName()); - } - - @Override - public synchronized void start() { - try { - if (!getConfig().getBoolean( - JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS, - JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) { - // pick free random ports. - getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS, - MiniYARNCluster.getHostname() + ":0"); - getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS, - MiniYARNCluster.getHostname() + ":0"); - } - super.start(); - } catch (Throwable t) { - throw new YarnRuntimeException(t); - } - - LOG.info("MiniMRYARN ResourceManager address: " + - getConfig().get(YarnConfiguration.RM_ADDRESS)); - LOG.info("MiniMRYARN ResourceManager web address: " + - getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS)); - LOG.info("MiniMRYARN HistoryServer address: " + - getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS)); - LOG.info("MiniMRYARN HistoryServer web address: " + - getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS)); - } - - @Override - public synchronized void stop() { - super.stop(); - } - } - - public static void main(String [] args) { - MiniTajoYarnCluster cluster = new MiniTajoYarnCluster(MiniTajoYarnCluster.class.getName()); - cluster.init(new TajoConf()); - cluster.start(); - } -} - http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 17348e1..9d9310e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -26,13 +26,10 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.tajo.catalog.*; @@ -56,9 +53,10 @@ import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.TajoWorker; -import java.io.*; +import java.io.File; +import java.io.IOException; +import java.io.Writer; import java.net.InetSocketAddress; -import java.net.URL; import java.sql.ResultSet; import java.util.ArrayList; import java.util.List; @@ -68,8 +66,6 @@ import java.util.UUID; public class TajoTestingCluster { private static Log LOG = LogFactory.getLog(TajoTestingCluster.class); private TajoConf conf; - - protected MiniTajoYarnCluster yarnCluster; private FileSystem defaultFS; private MiniDFSCluster dfsCluster; private MiniCatalogServer catalogServer; @@ -77,7 +73,6 @@ public class TajoTestingCluster { private TajoMaster tajoMaster; private List tajoWorkers = new ArrayList(); - private boolean standbyWorkerMode = false; private boolean isDFSRunning = false; private boolean isTajoClusterRunning = false; private boolean isCatalogServerRunning = false; @@ -165,9 +160,6 @@ public class TajoTestingCluster { // Memory cache termination conf.setIntVar(ConfVars.WORKER_HISTORY_EXPIRE_PERIOD, 1); - this.standbyWorkerMode = conf.getVar(ConfVars.RESOURCE_MANAGER_CLASS) - .indexOf(TajoWorkerResourceManager.class.getName()) >= 0; - /* Since Travi CI limits the size of standard output log up to 4MB */ if (!StringUtils.isEmpty(LOG_LEVEL)) { Level defaultLevel = Logger.getRootLogger().getLevel(); @@ -370,9 +362,8 @@ public class TajoTestingCluster { this.conf.setVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS, c.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS)); this.conf.setVar(ConfVars.CATALOG_ADDRESS, c.getVar(ConfVars.CATALOG_ADDRESS)); - if(standbyWorkerMode) { - startTajoWorkers(numSlaves); - } + startTajoWorkers(numSlaves); + isTajoClusterRunning = true; LOG.info("Mini Tajo cluster is up"); LOG.info("===================================================================================="); @@ -434,7 +425,7 @@ public class TajoTestingCluster { workerConf.setVar(ConfVars.WORKER_QM_RPC_ADDRESS, "localhost:0"); - tajoWorker.startWorker(workerConf, new String[]{"standby"}); + tajoWorker.startWorker(workerConf, new String[0]); LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started."); tajoWorkers.add(tajoWorker); @@ -512,46 +503,9 @@ public class TajoTestingCluster { hbaseUtil = new HBaseTestClusterUtil(conf, clusterTestBuildDir); - if(!standbyWorkerMode) { - startMiniYarnCluster(); - } - startMiniTajoCluster(this.clusterTestBuildDir, numSlaves, false); } - private void startMiniYarnCluster() throws Exception { - LOG.info("Starting up YARN cluster"); - // Scheduler properties required for YARN to work - conf.set("yarn.scheduler.capacity.root.queues", "default"); - conf.set("yarn.scheduler.capacity.root.default.capacity", "100"); - - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1); - conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 2); - - if (yarnCluster == null) { - yarnCluster = new MiniTajoYarnCluster(TajoTestingCluster.class.getName(), 3); - yarnCluster.init(conf); - yarnCluster.start(); - - ResourceManager resourceManager = yarnCluster.getResourceManager(); - InetSocketAddress rmAddr = resourceManager.getClientRMService().getBindAddress(); - InetSocketAddress rmSchedulerAddr = resourceManager.getApplicationMasterService().getBindAddress(); - conf.set(YarnConfiguration.RM_ADDRESS, NetUtils.normalizeInetSocketAddress(rmAddr)); - conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, NetUtils.normalizeInetSocketAddress(rmSchedulerAddr)); - - URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml"); - if (url == null) { - throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath"); - } - yarnCluster.getConfig().set("yarn.application.classpath", new File(url.getPath()).getParent()); - OutputStream os = new FileOutputStream(new File(url.getPath())); - yarnCluster.getConfig().writeXml(os); - os.close(); - } - } - public void startMiniClusterInLocal(final int numSlaves) throws Exception { isRunningCluster(); @@ -580,10 +534,6 @@ public class TajoTestingCluster { isCatalogServerRunning = false; } - if(this.yarnCluster != null) { - this.yarnCluster.stop(); - } - try { Thread.sleep(3000); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java index a013d0b..2c997a3 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java @@ -49,7 +49,7 @@ public class TestTajoResourceManager { int workerMemoryMB = 512 * 10; WorkerResourceAllocationResponse response; - private TajoWorkerResourceManager initResourceManager(boolean queryMasterMode) throws Exception { + private TajoWorkerResourceManager initResourceManager() throws Exception { tajoConf = new org.apache.tajo.conf.TajoConf(); tajoConf.setFloatVar(TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT, 0.0f); @@ -85,8 +85,6 @@ public class TestTajoResourceManager { disks.add(disk); ServerStatusProto serverStatus = ServerStatusProto.newBuilder() - .setQueryMasterMode(queryMasterMode ? BOOL_TRUE : BOOL_FALSE) - .setTaskRunnerMode(BOOL_TRUE) .setDiskSlots(workerDiskSlots) .setMemoryResourceMB(workerMemoryMB) .setJvmHeap(jvmHeap) @@ -113,7 +111,7 @@ public class TestTajoResourceManager { public void testHeartbeat() throws Exception { TajoWorkerResourceManager tajoWorkerResourceManager = null; try { - tajoWorkerResourceManager = initResourceManager(false); + tajoWorkerResourceManager = initResourceManager(); assertEquals(numWorkers, tajoWorkerResourceManager.getWorkers().size()); for(Worker worker: tajoWorkerResourceManager.getWorkers().values()) { WorkerResource resource = worker.getResource(); @@ -131,7 +129,7 @@ public class TestTajoResourceManager { public void testMemoryResource() throws Exception { TajoWorkerResourceManager tajoWorkerResourceManager = null; try { - tajoWorkerResourceManager = initResourceManager(false); + tajoWorkerResourceManager = initResourceManager(); final int minMemory = 256; final int maxMemory = 512; @@ -215,7 +213,7 @@ public class TestTajoResourceManager { TajoWorkerResourceManager tajoWorkerResourceManager = null; try { - tajoWorkerResourceManager = initResourceManager(false); + tajoWorkerResourceManager = initResourceManager(); final int minMemory = 200; final int maxMemory = 500; @@ -300,7 +298,7 @@ public class TestTajoResourceManager { TajoWorkerResourceManager tajoWorkerResourceManager = null; try { - tajoWorkerResourceManager = initResourceManager(false); + tajoWorkerResourceManager = initResourceManager(); final float minDiskSlots = 1.0f; final float maxDiskSlots = 2.0f; @@ -382,7 +380,7 @@ public class TestTajoResourceManager { TajoWorkerResourceManager tajoWorkerResourceManager = null; try { - tajoWorkerResourceManager = initResourceManager(false); + tajoWorkerResourceManager = initResourceManager(); final float minDiskSlots = 1.0f; final float maxDiskSlots = 2.0f; http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-dist/src/main/bin/start-tajo.sh ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/start-tajo.sh b/tajo-dist/src/main/bin/start-tajo.sh index 8fcf1a6..efc0371 100755 --- a/tajo-dist/src/main/bin/start-tajo.sh +++ b/tajo-dist/src/main/bin/start-tajo.sh @@ -44,12 +44,9 @@ if [ -f "${TAJO_CONF_DIR}/tajo-env.sh" ]; then . "${TAJO_CONF_DIR}/tajo-env.sh" fi -if [ "$TAJO_WORKER_STANDBY_MODE" = "true" ]; then - if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then - "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start pullserver - fi - "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start worker - if [ -f "${TAJO_CONF_DIR}/querymasters" ]; then - "$bin/tajo-daemons.sh" --hosts querymasters cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start querymaster - fi + +if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then + "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start pullserver fi + +"$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" start worker http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-dist/src/main/bin/stop-tajo.sh ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/stop-tajo.sh b/tajo-dist/src/main/bin/stop-tajo.sh index 770034b..32a4a4c 100755 --- a/tajo-dist/src/main/bin/stop-tajo.sh +++ b/tajo-dist/src/main/bin/stop-tajo.sh @@ -44,13 +44,8 @@ if [ -f "${TAJO_CONF_DIR}/tajo-env.sh" ]; then . "${TAJO_CONF_DIR}/tajo-env.sh" fi -if [ "$TAJO_WORKER_STANDBY_MODE" = "true" ]; then - "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop worker - if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then - "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop pullserver - fi - if [ -f "${TAJO_CONF_DIR}/querymasters" ]; then - "$bin/tajo-daemons.sh" --hosts querymasters cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop querymaster - fi +"$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop worker +if [ "$TAJO_PULLSERVER_STANDALONE" = "true" ]; then + "$bin/tajo-daemons.sh" cd "$TAJO_HOME" \; "$bin/tajo-daemon.sh" stop pullserver fi http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-dist/src/main/bin/tajo ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo index cc611d5..a92d1a9 100755 --- a/tajo-dist/src/main/bin/tajo +++ b/tajo-dist/src/main/bin/tajo @@ -390,15 +390,6 @@ elif [ "$COMMAND" = "worker" ] ; then elif [ "$COMMAND" = "pullserver" ] ; then CLASS='org.apache.tajo.pullserver.TajoPullServer' TAJO_OPTS="$TAJO_OPTS $JAVA_PULLSERVER_HEAP_MAX $TAJO_PULLSERVER_OPTS" -elif [ "$COMMAND" = "querymaster" ] ; then - CLASS='org.apache.tajo.worker.TajoWorker' - TAJO_OPTS="$TAJO_OPTS $JAVA_QUERYMASTER_HEAP_MAX $TAJO_QUERYMASTER_OPTS" - TAJO_DAEMON_MODE='standby-qm' -elif [ "$COMMAND" = "taskrunner" ] ; then - CLASS='org.apache.tajo.worker.TajoWorker' - TAJO_OPTS="$TAJO_OPTS $TAJO_WORKER_OPTS" - TAJO_OPTS="$TAJO_OPTS $JAVA_WORKER_HEAP_MAX $TAJO_WORKER_OPTS" - TAJO_DAEMON_MODE='standby-tr' elif [ "$COMMAND" = "catalog" ] ; then CLASS='org.apache.tajo.catalog.CatalogServer' TAJO_OPTS="$TAJO_OPTS $TAJO_CATALOG_OPTS" http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-dist/src/main/conf/tajo-env.sh ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/conf/tajo-env.sh b/tajo-dist/src/main/conf/tajo-env.sh index cbb881e..d52a827 100755 --- a/tajo-dist/src/main/conf/tajo-env.sh +++ b/tajo-dist/src/main/conf/tajo-env.sh @@ -72,9 +72,6 @@ # The scheduling priority for daemon processes. See 'man nice'. # export TAJO_NICENESS=10 -# Tajo cluster mode. the default mode is standby mode. -export TAJO_WORKER_STANDBY_MODE=true - # It must be required to use HCatalogStore # export HIVE_HOME= # export HIVE_JDBC_DRIVER_DIR= http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java deleted file mode 100644 index fba0593..0000000 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java +++ /dev/null @@ -1,655 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import com.google.common.collect.Lists; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.channel.*; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.http.*; -import io.netty.handler.ssl.SslHandler; -import io.netty.handler.stream.ChunkedWriteHandler; -import io.netty.util.CharsetUtil; -import io.netty.util.concurrent.GenericFutureListener; -import io.netty.util.concurrent.GlobalEventExecutor; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.ReadaheadPool; -import org.apache.hadoop.metrics2.MetricsSystem; -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterInt; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; -import org.apache.hadoop.metrics2.lib.MutableGaugeInt; -import org.apache.hadoop.security.ssl.SSLFactory; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext; -import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext; -import org.apache.hadoop.yarn.server.api.AuxiliaryService; -import org.apache.tajo.QueryId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.pullserver.retriever.FileChunk; -import org.apache.tajo.rpc.RpcChannelFactory; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; -import org.apache.tajo.storage.index.bst.BSTIndex; -import org.apache.tajo.util.TajoIdUtils; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.net.InetSocketAddress; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class PullServerAuxService extends AuxiliaryService { - - private static final Log LOG = LogFactory.getLog(PullServerAuxService.class); - - public static final String SHUFFLE_MANAGE_OS_CACHE = "tajo.pullserver.manage.os.cache"; - public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true; - - public static final String SHUFFLE_READAHEAD_BYTES = "tajo.pullserver.readahead.bytes"; - public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; - - private int port; - private ServerBootstrap selector; - private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - private HttpChannelInitializer initializer; - private int sslFileBufferSize; - - private ApplicationId appId; - private QueryId queryId; - private FileSystem localFS; - - /** - * Should the shuffle use posix_fadvise calls to manage the OS cache during - * sendfile - */ - private boolean manageOsCache; - private int readaheadLength; - private ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); - - - public static final String PULLSERVER_SERVICEID = "tajo.pullserver"; - - private static final Map userRsrc = - new ConcurrentHashMap(); - private static String userName; - - public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = - "tajo.pullserver.ssl.file.buffer.size"; - - public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; - - @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo") - static class ShuffleMetrics implements GenericFutureListener { - @Metric({"OutputBytes","PullServer output in bytes"}) - MutableCounterLong shuffleOutputBytes; - @Metric({"Failed","# of failed shuffle outputs"}) - MutableCounterInt shuffleOutputsFailed; - @Metric({"Succeeded","# of succeeded shuffle outputs"}) - MutableCounterInt shuffleOutputsOK; - @Metric({"Connections","# of current shuffle connections"}) - MutableGaugeInt shuffleConnections; - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - shuffleOutputsOK.incr(); - } else { - shuffleOutputsFailed.incr(); - } - shuffleConnections.decr(); - } - } - - final ShuffleMetrics metrics; - - PullServerAuxService(MetricsSystem ms) { - super("httpshuffle"); - metrics = ms.register(new ShuffleMetrics()); - } - - @SuppressWarnings("UnusedDeclaration") - public PullServerAuxService() { - this(DefaultMetricsSystem.instance()); - } - - /** - * Serialize the shuffle port into a ByteBuffer for use later on. - * @param port the port to be sent to the ApplciationMaster - * @return the serialized form of the port. - */ - public static ByteBuffer serializeMetaData(int port) throws IOException { - //TODO these bytes should be versioned - DataOutputBuffer port_dob = new DataOutputBuffer(); - port_dob.writeInt(port); - return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength()); - } - - /** - * A helper function to deserialize the metadata returned by PullServerAuxService. - * @param meta the metadata returned by the PullServerAuxService - * @return the port the PullServer Handler is listening on to serve shuffle data. - */ - public static int deserializeMetaData(ByteBuffer meta) throws IOException { - //TODO this should be returning a class not just an int - DataInputByteBuffer in = new DataInputByteBuffer(); - in.reset(meta); - return in.readInt(); - } - - @Override - public void initializeApplication(ApplicationInitializationContext appInitContext) { - // TODO these bytes should be versioned - // TODO: Once SHuffle is out of NM, this can use MR APIs - this.appId = appInitContext.getApplicationId(); - this.queryId = TajoIdUtils.parseQueryId(appId.toString()); - this.userName = appInitContext.getUser(); - userRsrc.put(this.appId.toString(), this.userName); - } - - @Override - public void stopApplication(ApplicationTerminationContext appStopContext) { - userRsrc.remove(appStopContext.getApplicationId().toString()); - } - - @Override - public synchronized void init(Configuration conf) { - try { - manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, - DEFAULT_SHUFFLE_MANAGE_OS_CACHE); - - readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, - DEFAULT_SHUFFLE_READAHEAD_BYTES); - - selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", 0) - .option(ChannelOption.TCP_NODELAY, true) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .childOption(ChannelOption.TCP_NODELAY, true); - - localFS = new LocalFileSystem(); - super.init(new Configuration(conf)); - } catch (Throwable t) { - LOG.error(t, t); - } - } - - // TODO change AbstractService to throw InterruptedException - @Override - public synchronized void start() { - Configuration conf = getConfig(); - ServerBootstrap bootstrap = selector.clone(); - try { - initializer = new HttpChannelInitializer(conf); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - bootstrap.channel(NioServerSocketChannel.class) - .handler(initializer); - port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, - ConfVars.PULLSERVER_PORT.defaultIntVal); - ChannelFuture future = bootstrap.bind(new InetSocketAddress(port)) - .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE) - .syncUninterruptibly(); - accepted.add(future.channel()); - port = ((InetSocketAddress)future.channel().localAddress()).getPort(); - conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); - initializer.PullServer.setPort(port); - LOG.info(getName() + " listening on port " + port); - super.start(); - - sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, - DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE); - } - - public int getPort() { - return port; - } - - @Override - public synchronized void stop() { - try { - accepted.close(); - if (selector != null) { - if (selector.group() != null) { - selector.group().shutdownGracefully(); - } - if (selector.childGroup() != null) { - selector.childGroup().shutdownGracefully(); - } - } - - if (initializer != null) { - initializer.destroy(); - } - - localFS.close(); - } catch (Throwable t) { - LOG.error(t, t); - } finally { - super.stop(); - } - } - - @Override - public synchronized ByteBuffer getMetaData() { - try { - return serializeMetaData(port); - } catch (IOException e) { - LOG.error("Error during getMeta", e); - // TODO add API to AuxiliaryServices to report failures - return null; - } - } - - class HttpChannelInitializer extends ChannelInitializer { - - final PullServer PullServer; - private SSLFactory sslFactory; - - public HttpChannelInitializer(Configuration conf) throws Exception { - PullServer = new PullServer(conf); - if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname, - ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) { - sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf); - sslFactory.init(); - } - } - - public void destroy() { - if (sslFactory != null) { - sslFactory.destroy(); - } - } - - @Override - protected void initChannel(Channel channel) throws Exception { - ChannelPipeline pipeline = channel.pipeline(); - if (sslFactory != null) { - pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); - } - - pipeline.addLast("encoder", new HttpResponseEncoder()); - pipeline.addLast("decoder", new HttpRequestDecoder()); - pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16)); - pipeline.addLast("chunking", new ChunkedWriteHandler()); - pipeline.addLast("shuffle", PullServer); - // TODO factor security manager into pipeline - // TODO factor out encode/decode to permit binary shuffle - // TODO factor out decode of index to permit alt. models - } - } - - @ChannelHandler.Sharable - class PullServer extends SimpleChannelInboundHandler { - private final Configuration conf; - private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); - private int port; - - public PullServer(Configuration conf) { - this.conf = conf; - this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal); - } - - public void setPort(int port) { - this.port = port; - } - - private List splitMaps(List mapq) { - if (null == mapq) { - return null; - } - final List ret = new ArrayList(); - for (String s : mapq) { - Collections.addAll(ret, s.split(",")); - } - return ret; - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) - throws Exception { - if (request.getMethod() != HttpMethod.GET) { - sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); - return; - } - - // Parsing the URL into key-values - final Map> params = new QueryStringDecoder(request.getUri()).parameters(); - final List types = params.get("type"); - final List taskIdList = params.get("ta"); - final List stageIds = params.get("sid"); - final List partitionIds = params.get("p"); - - if (types == null || taskIdList == null || stageIds == null || partitionIds == null) { - sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST); - return; - } - - if (types.size() != 1 || stageIds.size() != 1) { - sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST); - return; - } - - final List chunks = Lists.newArrayList(); - - String repartitionType = types.get(0); - String sid = stageIds.get(0); - String partitionId = partitionIds.get(0); - List taskIds = splitMaps(taskIdList); - - // the working dir of tajo worker for each query - String queryBaseDir = queryId + "/output" + "/"; - - LOG.info("PullServer request param: repartitionType=" + repartitionType + ", sid=" + sid + ", partitionId=" - + partitionId + ", taskIds=" + taskIdList); - - String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname); - if (taskLocalDir == null || taskLocalDir.equals("")) { - LOG.error("Tajo local directory should be specified."); - } - LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir); - - // if a stage requires a range partitioning - if (repartitionType.equals("r")) { - String ta = taskIds.get(0); - Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta - + "/output/", conf)); - - String startKey = params.get("start").get(0); - String endKey = params.get("end").get(0); - boolean last = params.get("final") != null; - - FileChunk chunk; - try { - chunk = getFileCunks(path, startKey, endKey, last); - } catch (Throwable t) { - LOG.error("ERROR Request: " + request.getUri(), t); - sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST); - return; - } - if (chunk != null) { - chunks.add(chunk); - } - - // if a stage requires a hash repartition or a scattered hash - // repartition - } else if (repartitionType.equals("h") || repartitionType.equals("s")) { - for (String ta : taskIds) { - Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta - + "/output/" + partitionId, conf)); - File file = new File(path.toUri()); - FileChunk chunk = new FileChunk(file, 0, file.length()); - chunks.add(chunk); - } - } else { - LOG.error("Unknown repartition type: " + repartitionType); - return; - } - - // Write the content. - if (chunks.size() == 0) { - HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); - - if (!HttpHeaders.isKeepAlive(request)) { - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); - } else { - response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - ctx.writeAndFlush(response); - } - } else { - FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); - ChannelFuture writeFuture = null; - HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - long totalSize = 0; - for (FileChunk chunk : file) { - totalSize += chunk.length(); - } - HttpHeaders.setContentLength(response, totalSize); - - if (HttpHeaders.isKeepAlive(request)) { - response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - // Write the initial line and the header. - writeFuture = ctx.write(response); - - for (FileChunk chunk : file) { - writeFuture = sendFile(ctx, chunk); - if (writeFuture == null) { - sendError(ctx, HttpResponseStatus.NOT_FOUND); - return; - } - } - if (ctx.pipeline().get(SslHandler.class) == null) { - writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - } else { - ctx.flush(); - } - - // Decide whether to close the connection or not. - if (!HttpHeaders.isKeepAlive(request)) { - // Close the connection when the whole content is written out. - writeFuture.addListener(ChannelFutureListener.CLOSE); - } - } - - } - - private ChannelFuture sendFile(ChannelHandlerContext ctx, - FileChunk file) throws IOException { - RandomAccessFile spill; - try { - spill = new RandomAccessFile(file.getFile(), "r"); - } catch (FileNotFoundException e) { - LOG.info(file.getFile() + " not found"); - return null; - } - - ChannelFuture lastContentFuture; - if (ctx.pipeline().get(SslHandler.class) == null) { - final FadvisedFileRegion partition = new FadvisedFileRegion(spill, - file.startOffset(), file.length(), manageOsCache, readaheadLength, - readaheadPool, file.getFile().getAbsolutePath()); - lastContentFuture = ctx.write(partition); - lastContentFuture.addListener(new FileCloseListener(partition, null, 0, null)); - } else { - // HTTPS cannot be done with zero copy. - final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, - file.startOffset(), file.length(), sslFileBufferSize, - manageOsCache, readaheadLength, readaheadPool, - file.getFile().getAbsolutePath()); - lastContentFuture = ctx.write(new HttpChunkedInput(chunk)); - } - metrics.shuffleConnections.incr(); - metrics.shuffleOutputBytes.incr(file.length()); // optimistic - return lastContentFuture; - } - - private void sendError(ChannelHandlerContext ctx, - HttpResponseStatus status) { - sendError(ctx, "", status); - } - - private void sendError(ChannelHandlerContext ctx, String message, - HttpResponseStatus status) { - FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, - Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); - response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); - - // Close the connection as soon as the error message is sent. - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { - Channel ch = ctx.channel(); - if (cause instanceof TooLongFrameException) { - sendError(ctx, HttpResponseStatus.BAD_REQUEST); - return; - } - - LOG.error("PullServer error: ", cause); - if (ch.isActive()) { - sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); - } - } - } - - public FileChunk getFileCunks(Path outDir, - String startKey, - String endKey, - boolean last) throws IOException { - BSTIndex index = new BSTIndex(new TajoConf()); - BSTIndex.BSTIndexReader idxReader = - index.getIndexReader(new Path(outDir, "index")); - idxReader.open(); - Schema keySchema = idxReader.getKeySchema(); - TupleComparator comparator = idxReader.getComparator(); - - LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " - + idxReader.getLastKey()); - - File data = new File(URI.create(outDir.toUri() + "/output")); - byte [] startBytes = Base64.decodeBase64(startKey); - byte [] endBytes = Base64.decodeBase64(endKey); - - RowStoreDecoder decoder = RowStoreUtil.createDecoder(keySchema); - Tuple start; - Tuple end; - try { - start = decoder.toTuple(startBytes); - } catch (Throwable t) { - throw new IllegalArgumentException("StartKey: " + startKey - + ", decoded byte size: " + startBytes.length, t); - } - - try { - end = decoder.toTuple(endBytes); - } catch (Throwable t) { - throw new IllegalArgumentException("EndKey: " + endKey - + ", decoded byte size: " + endBytes.length, t); - } - - - if(!comparator.isAscendingFirstKey()) { - Tuple tmpKey = start; - start = end; - end = tmpKey; - } - - LOG.info("GET Request for " + data.getAbsolutePath() + " (start="+start+", end="+ end + - (last ? ", last=true" : "") + ")"); - - if (idxReader.getFirstKey() == null && idxReader.getLastKey() == null) { // if # of rows is zero - LOG.info("There is no contents"); - return null; - } - - if (comparator.compare(end, idxReader.getFirstKey()) < 0 || - comparator.compare(idxReader.getLastKey(), start) < 0) { - LOG.info("Out of Scope (indexed data [" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + - "], but request start:" + start + ", end: " + end); - return null; - } - - long startOffset; - long endOffset; - try { - startOffset = idxReader.find(start); - } catch (IOException ioe) { - LOG.error("State Dump (the requested range: " - + "[" + start + ", " + end +")" + ", idx min: " - + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - throw ioe; - } - try { - endOffset = idxReader.find(end); - if (endOffset == -1) { - endOffset = idxReader.find(end, true); - } - } catch (IOException ioe) { - LOG.error("State Dump (the requested range: " - + "[" + start + ", " + end +")" + ", idx min: " - + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - throw ioe; - } - - // if startOffset == -1 then case 2-1 or case 3 - if (startOffset == -1) { // this is a hack - // if case 2-1 or case 3 - try { - startOffset = idxReader.find(start, true); - } catch (IOException ioe) { - LOG.error("State Dump (the requested range: " - + "[" + start + ", " + end +")" + ", idx min: " - + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - throw ioe; - } - } - - if (startOffset == -1) { - throw new IllegalStateException("startOffset " + startOffset + " is negative \n" + - "State Dump (the requested range: " - + "[" + start + ", " + end +")" + ", idx min: " + idxReader.getFirstKey() + ", idx max: " - + idxReader.getLastKey()); - } - - // if greater than indexed values - if (last || (endOffset == -1 - && comparator.compare(idxReader.getLastKey(), end) < 0)) { - endOffset = data.length(); - } - - FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); - LOG.info("Retrieve File Chunk: " + chunk); - return chunk; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/968633ff/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java index d030eed..4609712 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServer.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.service.CompositeService; import org.apache.tajo.TajoConstants; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.pullserver.PullServerAuxService.PullServer; import org.apache.tajo.util.StringUtils; public class TajoPullServer extends CompositeService { @@ -40,6 +39,9 @@ public class TajoPullServer extends CompositeService { @Override public void init(Configuration conf) { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } this.systemConf = (TajoConf)conf; pullService = new TajoPullServerService(); addService(pullService); @@ -58,7 +60,7 @@ public class TajoPullServer extends CompositeService { } public static void main(String[] args) throws Exception { - StringUtils.startupShutdownMessage(PullServer.class, args, LOG); + StringUtils.startupShutdownMessage(TajoPullServerService.PullServer.class, args, LOG); if (!TajoPullServerService.isStandalone()) { LOG.fatal("TAJO_PULLSERVER_STANDALONE env variable is not 'true'");