Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F0CECC4B4 for ; Fri, 9 Jan 2015 11:46:50 +0000 (UTC) Received: (qmail 75470 invoked by uid 500); 9 Jan 2015 11:46:51 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 75429 invoked by uid 500); 9 Jan 2015 11:46:51 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 75419 invoked by uid 99); 9 Jan 2015 11:46:51 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Jan 2015 11:46:51 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 0079C92BA53; Fri, 9 Jan 2015 11:46:50 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hyunsik@apache.org To: commits@tajo.apache.org Date: Fri, 09 Jan 2015 11:46:50 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] tajo git commit: TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol. Repository: tajo Updated Branches: refs/heads/master 50a8a663c -> 807868bd4 http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 1ea7051..13394f8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -45,7 +45,6 @@ import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; @@ -838,22 +837,6 @@ public class Stage implements EventHandler { } /** - * Getting the total memory of cluster - * - * @param stage - * @return mega bytes - */ - private static int getClusterTotalMemory(Stage stage) { - List workers = - stage.context.getQueryMasterContext().getQueryMaster().getAllWorker(); - - int totalMem = 0; - for (TajoMasterProtocol.WorkerResourceProto worker : workers) { - totalMem += worker.getMemoryMB(); - } - return totalMem; - } - /** * Getting the desire number of partitions according to the volume of input data. * This method is only used to determine the partition key number of hash join or aggregation. * http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index d711258..82fb37f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -25,7 +25,7 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.ha.HAService; -import org.apache.tajo.querymaster.QueryInProgress; +import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.querymaster.Task; import org.apache.tajo.querymaster.Stage; http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 8241478..04b65d2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; @@ -38,16 +40,18 @@ import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.ContainerAllocationEvent; import org.apache.tajo.master.event.ContainerAllocatorEventType; import org.apache.tajo.master.event.StageContainerAllocationEvent; +import org.apache.tajo.master.rm.TajoWorkerContainer; +import org.apache.tajo.master.rm.TajoWorkerContainerId; +import org.apache.tajo.master.rm.Worker; +import org.apache.tajo.master.rm.WorkerResource; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; -import org.apache.tajo.master.rm.*; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.util.ApplicationIdUtils; -import org.apache.tajo.ha.HAServiceUtil; import java.net.InetSocketAddress; import java.util.*; @@ -91,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { int memoryMBPerTask) { //TODO consider disk slot - TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource(); + ClusterResourceSummary clusterResource = workerContext.getClusterResource(); int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask; clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks + @@ -249,20 +253,19 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { @Override public void run() { LOG.info("Start TajoWorkerAllocationThread"); - CallFuture callBack = - new CallFuture(); + CallFuture callBack = + new CallFuture(); //TODO consider task's resource usage pattern int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY); float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK); - TajoMasterProtocol.WorkerResourceAllocationRequest request = - TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder() + WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() .setMinMemoryMBPerContainer(requiredMemoryMB) .setMaxMemoryMBPerContainer(requiredMemoryMB) .setNumContainers(event.getRequiredNum()) - .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY - : TajoMasterProtocol.ResourceRequestPriority.DISK) + .setResourceRequestPriority(!event.isLeafQuery() ? + ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK) .setMinDiskSlotPerContainer(requiredDiskSlots) .setMaxDiskSlotPerContainer(requiredDiskSlots) .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) @@ -280,7 +283,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { try { tmClient = connPool.getConnection( queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } catch (Exception e) { queryTaskContext.getQueryMasterContext().getWorkerContext(). setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf)); @@ -288,15 +291,15 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf)); tmClient = connPool.getConnection( queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } } else { tmClient = connPool.getConnection( queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); + QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.allocateWorkerResources(null, request, callBack); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -304,7 +307,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { connPool.releaseConnection(tmClient); } - TajoMasterProtocol.WorkerResourceAllocationResponse response = null; + WorkerResourceAllocationResponse response = null; while(!stopped.get()) { try { response = callBack.get(3, TimeUnit.SECONDS); @@ -321,11 +324,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator { int numAllocatedContainers = 0; if(response != null) { - List allocatedResources = response.getWorkerAllocatedResourceList(); + List allocatedResources = response.getWorkerAllocatedResourceList(); ExecutionBlockId executionBlockId = event.getExecutionBlockId(); List containers = new ArrayList(); - for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) { + for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) { TajoWorkerContainer container = new TajoWorkerContainer(); NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(), eachAllocatedResource.getConnectionInfo().getPeerRpcPort()); http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 09a87e0..4003014 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -20,7 +20,6 @@ package org.apache.tajo.worker; import com.codahale.metrics.Gauge; import com.google.common.annotations.VisibleForTesting; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,13 +35,13 @@ import org.apache.tajo.catalog.CatalogClient; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.ha.TajoMasterInfo; -import org.apache.tajo.querymaster.QueryMaster; -import org.apache.tajo.querymaster.QueryMasterManagerService; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.pullserver.TajoPullServerService; +import org.apache.tajo.querymaster.QueryMaster; +import org.apache.tajo.querymaster.QueryMasterManagerService; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rule.EvaluationContext; @@ -50,7 +49,10 @@ import org.apache.tajo.rule.EvaluationFailedException; import org.apache.tajo.rule.SelfDiagnosisRuleEngine; import org.apache.tajo.rule.SelfDiagnosisRuleSession; import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.util.*; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.JvmPauseMonitor; +import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.history.HistoryReader; import org.apache.tajo.util.history.HistoryWriter; import org.apache.tajo.util.metrics.TajoSystemMetrics; @@ -115,7 +117,7 @@ public class TajoWorker extends CompositeService { private AtomicInteger numClusterNodes = new AtomicInteger(); - private TajoMasterProtocol.ClusterResourceSummary clusterResource; + private ClusterResourceSummary clusterResource; private WorkerConnectionInfo connectionInfo; @@ -516,13 +518,13 @@ public class TajoWorker extends CompositeService { return TajoWorker.this.numClusterNodes.get(); } - public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) { + public void setClusterResource(ClusterResourceSummary clusterResource) { synchronized (numClusterNodes) { TajoWorker.this.clusterResource = clusterResource; } } - public TajoMasterProtocol.ClusterResourceSummary getClusterResource() { + public ClusterResourceSummary getClusterResource() { synchronized (numClusterNodes) { return TajoWorker.this.clusterResource; } http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index c809921..b92c4cd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -26,7 +26,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ha.HAServiceUtil; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; @@ -35,7 +38,6 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.DiskDeviceInfo; import org.apache.tajo.storage.DiskMountInfo; import org.apache.tajo.storage.DiskUtil; -import org.apache.tajo.ha.HAServiceUtil; import java.io.File; import java.util.List; @@ -98,8 +100,8 @@ public class WorkerHeartbeatService extends AbstractService { class WorkerHeartbeatThread extends Thread { private volatile AtomicBoolean stopped = new AtomicBoolean(false); - TajoMasterProtocol.ServerStatusProto.System systemInfo; - List diskInfos = Lists.newArrayList(); + ServerStatusProto.System systemInfo; + List diskInfos = Lists.newArrayList(); float workerDiskSlots; int workerMemoryMB; List diskDeviceInfos; @@ -137,7 +139,7 @@ public class WorkerHeartbeatService extends AbstractService { } } - systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder() + systemInfo = ServerStatusProto.System.newBuilder() .setAvailableProcessors(workerCpuCoreNum) .setFreeMemoryMB(0) .setMaxMemoryMB(0) @@ -153,14 +155,14 @@ public class WorkerHeartbeatService extends AbstractService { if(sendDiskInfoCount == 0 && diskDeviceInfos != null) { getDiskUsageInfos(); } - TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap = - TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder() + ServerStatusProto.JvmHeap jvmHeap = + ServerStatusProto.JvmHeap.newBuilder() .setMaxHeap(Runtime.getRuntime().maxMemory()) .setFreeHeap(Runtime.getRuntime().freeMemory()) .setTotalHeap(Runtime.getRuntime().totalMemory()) .build(); - TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder() + ServerStatusProto serverStatus = ServerStatusProto.newBuilder() .addAllDisk(diskInfos) .setRunningTaskNum( context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks()) @@ -179,8 +181,7 @@ public class WorkerHeartbeatService extends AbstractService { NettyClientBase rmClient = null; try { - CallFuture callBack = - new CallFuture(); + CallFuture callBack = new CallFuture(); // In TajoMaster HA mode, if backup master be active status, // worker may fail to connect existing active master. Thus, @@ -201,9 +202,9 @@ public class WorkerHeartbeatService extends AbstractService { TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub(); resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack); - TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS); + TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS); if(response != null) { - TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary(); + ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary(); if(clusterResourceSummary.getNumWorkers() > 0) { context.setNumClusterNodes(clusterResourceSummary.getNumWorkers()); } @@ -249,7 +250,7 @@ public class WorkerHeartbeatService extends AbstractService { if(mountInfos != null) { for(DiskMountInfo eachMount: mountInfos) { File eachFile = new File(eachMount.getMountPath()); - diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder() + diskInfos.add(ServerStatusProto.Disk.newBuilder() .setAbsolutePath(eachFile.getAbsolutePath()) .setTotalSpace(eachFile.getTotalSpace()) .setFreeSpace(eachFile.getFreeSpace()) http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java index 6eb710a..68890e3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java @@ -18,23 +18,19 @@ package org.apache.tajo.worker.rule; -import java.net.InetSocketAddress; - import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.rule.EvaluationContext; -import org.apache.tajo.rule.EvaluationResult; -import org.apache.tajo.rule.SelfDiagnosisRuleDefinition; -import org.apache.tajo.rule.SelfDiagnosisRuleVisibility; +import org.apache.tajo.rule.*; import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; -import org.apache.tajo.rule.SelfDiagnosisRule; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.TajoWorker; +import java.net.InetSocketAddress; + /** * With this rule, Tajo worker will check the connectivity to tajo master server. */ @@ -54,7 +50,7 @@ public class ConnectivityCheckerRuleForTajoWorker implements SelfDiagnosisRule { } else { masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS)); } - masterClient = pool.getConnection(masterAddress, TajoMasterProtocol.class, true); + masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true); masterClient.getStub(); } finally { http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto new file mode 100644 index 0000000..41a382f --- /dev/null +++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//TajoWorker -> TajoMaster protocol + +option java_package = "org.apache.tajo.ipc"; +option java_outer_classname = "QueryCoordinatorProtocol"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +import "yarn_protos.proto"; +import "tajo_protos.proto"; +import "TajoIdProtos.proto"; +import "CatalogProtos.proto"; +import "PrimitiveProtos.proto"; +import "ContainerProtocol.proto"; + +package hadoop.yarn; + +message ServerStatusProto { + message System { + required int32 availableProcessors = 1; + required int32 freeMemoryMB = 2; + required int32 maxMemoryMB = 3; + required int32 totalMemoryMB = 4; + } + message Disk { + required string absolutePath = 1; + required int64 totalSpace = 2; + required int64 freeSpace = 3; + required int64 usableSpace = 4; + } + + message JvmHeap { + required int64 maxHeap = 1; + required int64 totalHeap = 2; + required int64 freeHeap = 3; + } + + required System system = 1; + required float diskSlots = 2; + required int32 memoryResourceMB = 3; + repeated Disk disk = 4; + required int32 runningTaskNum = 5; + required JvmHeap jvmHeap = 6; + required BoolProto queryMasterMode = 7; + required BoolProto taskRunnerMode = 8; +} + +message TajoHeartbeat { + required WorkerConnectionInfoProto connectionInfo = 1; + optional QueryIdProto queryId = 2; + optional QueryState state = 3; + optional TableDescProto resultDesc = 4; + optional string statusMessage = 5; + optional float queryProgress = 6; + optional int64 queryFinishTime = 7; +} + +message TajoHeartbeatResponse { + message ResponseCommand { + required string command = 1; + repeated string params = 2; + } + required BoolProto heartbeatResult = 1; + required ClusterResourceSummary clusterResourceSummary = 2; + optional ResponseCommand responseCommand = 3; +} + +message ClusterResourceSummary { + required int32 numWorkers = 1; + required int32 totalDiskSlots = 2; + required int32 totalCpuCoreSlots = 3; + required int32 totalMemoryMB = 4; + + required int32 totalAvailableDiskSlots = 5; + required int32 totalAvailableCpuCoreSlots = 6; + required int32 totalAvailableMemoryMB = 7; +} + +enum ResourceRequestPriority { + MEMORY = 1; + DISK = 2; +} + +message WorkerResourceAllocationRequest { + required QueryIdProto queryId = 1; + required ResourceRequestPriority resourceRequestPriority = 2; + + required int32 numContainers = 3; + + required int32 maxMemoryMBPerContainer = 4; + required int32 minMemoryMBPerContainer = 5; + + required float maxDiskSlotPerContainer = 6; + required float minDiskSlotPerContainer = 7; +} + +message WorkerResourceProto { + required WorkerConnectionInfoProto connectionInfo = 1; + required int32 memoryMB = 2 ; + required float diskSlots = 3; +} + +message WorkerResourcesRequest { + repeated WorkerResourceProto workerResources = 1; +} + +message WorkerResourceReleaseRequest { + required ExecutionBlockIdProto executionBlockId = 1; + repeated TajoContainerIdProto containerIds = 2; +} + +message WorkerAllocatedResource { + required TajoContainerIdProto containerId = 1; + required WorkerConnectionInfoProto connectionInfo = 2; + + required int32 allocatedMemoryMB = 3; + required float allocatedDiskSlots = 4; +} + +message WorkerResourceAllocationResponse { + required QueryIdProto queryId = 1; + repeated WorkerAllocatedResource workerAllocatedResource = 2; +} + +service QueryCoordinatorProtocolService { + rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse); + rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse); + rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto); + rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/ResourceTrackerProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto index b2db46a..40aeab7 100644 --- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto +++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto @@ -22,7 +22,7 @@ option java_outer_classname = "TajoResourceTrackerProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; -import "TajoMasterProtocol.proto"; +import "QueryCoordinatorProtocol.proto"; import "ContainerProtocol.proto"; import "tajo_protos.proto"; http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/proto/TajoMasterProtocol.proto ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto deleted file mode 100644 index bc73596..0000000 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//TajoWorker -> TajoMaster protocol - -option java_package = "org.apache.tajo.ipc"; -option java_outer_classname = "TajoMasterProtocol"; -option java_generic_services = true; -option java_generate_equals_and_hash = true; - -import "yarn_protos.proto"; -import "tajo_protos.proto"; -import "TajoIdProtos.proto"; -import "CatalogProtos.proto"; -import "PrimitiveProtos.proto"; -import "ContainerProtocol.proto"; - -package hadoop.yarn; - -message ServerStatusProto { - message System { - required int32 availableProcessors = 1; - required int32 freeMemoryMB = 2; - required int32 maxMemoryMB = 3; - required int32 totalMemoryMB = 4; - } - message Disk { - required string absolutePath = 1; - required int64 totalSpace = 2; - required int64 freeSpace = 3; - required int64 usableSpace = 4; - } - - message JvmHeap { - required int64 maxHeap = 1; - required int64 totalHeap = 2; - required int64 freeHeap = 3; - } - - required System system = 1; - required float diskSlots = 2; - required int32 memoryResourceMB = 3; - repeated Disk disk = 4; - required int32 runningTaskNum = 5; - required JvmHeap jvmHeap = 6; - required BoolProto queryMasterMode = 7; - required BoolProto taskRunnerMode = 8; -} - -message TajoHeartbeat { - required WorkerConnectionInfoProto connectionInfo = 1; - optional QueryIdProto queryId = 2; - optional QueryState state = 3; - optional TableDescProto resultDesc = 4; - optional string statusMessage = 5; - optional float queryProgress = 6; - optional int64 queryFinishTime = 7; -} - -message TajoHeartbeatResponse { - message ResponseCommand { - required string command = 1; - repeated string params = 2; - } - required BoolProto heartbeatResult = 1; - required ClusterResourceSummary clusterResourceSummary = 2; - optional ResponseCommand responseCommand = 3; -} - -message ClusterResourceSummary { - required int32 numWorkers = 1; - required int32 totalDiskSlots = 2; - required int32 totalCpuCoreSlots = 3; - required int32 totalMemoryMB = 4; - - required int32 totalAvailableDiskSlots = 5; - required int32 totalAvailableCpuCoreSlots = 6; - required int32 totalAvailableMemoryMB = 7; -} - -enum ResourceRequestPriority { - MEMORY = 1; - DISK = 2; -} - -message WorkerResourceAllocationRequest { - required QueryIdProto queryId = 1; - required ResourceRequestPriority resourceRequestPriority = 2; - - required int32 numContainers = 3; - - required int32 maxMemoryMBPerContainer = 4; - required int32 minMemoryMBPerContainer = 5; - - required float maxDiskSlotPerContainer = 6; - required float minDiskSlotPerContainer = 7; -} - -message WorkerResourceProto { - required WorkerConnectionInfoProto connectionInfo = 1; - required int32 memoryMB = 2 ; - required float diskSlots = 3; -} - -message WorkerResourcesRequest { - repeated WorkerResourceProto workerResources = 1; -} - -message WorkerResourceReleaseRequest { - required ExecutionBlockIdProto executionBlockId = 1; - repeated TajoContainerIdProto containerIds = 2; -} - -message WorkerAllocatedResource { - required TajoContainerIdProto containerId = 1; - required WorkerConnectionInfoProto connectionInfo = 2; - - required int32 allocatedMemoryMB = 3; - required float allocatedDiskSlots = 4; -} - -message WorkerResourceAllocationResponse { - required QueryIdProto queryId = 1; - repeated WorkerAllocatedResource workerAllocatedResource = 2; -} - -service TajoMasterProtocolService { - rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse); - rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse); - rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto); - rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/index.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index 00186d7..0defb3c 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -25,7 +25,7 @@ <%@ page import="org.apache.tajo.master.TajoMaster" %> <%@ page import="org.apache.tajo.ha.HAService" %> <%@ page import="org.apache.tajo.ha.TajoMasterInfo" %> -<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %> +<%@ page import="org.apache.tajo.master.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.master.rm.WorkerState" %> <%@ page import="org.apache.tajo.util.NetUtils" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/main/resources/webapps/admin/query.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index 4d8e5e6..85f7176 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -20,7 +20,7 @@ <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %> +<%@ page import="org.apache.tajo.master.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.StringUtils" %> http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 0786912..e548b81 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -43,8 +43,8 @@ import org.apache.tajo.client.TajoClientUtil; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider; +import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.querymaster.*; import org.apache.tajo.querymaster.Query; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; http://git-wip-us.apache.org/repos/asf/tajo/blob/807868bd/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java index b8fbd67..a013d0b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java @@ -19,12 +19,11 @@ package org.apache.tajo.master.rm; import com.google.protobuf.RpcCallback; -import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.TajoMasterProtocol.*; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;