Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 49498185CB for ; Mon, 20 Jul 2015 08:41:34 +0000 (UTC) Received: (qmail 44314 invoked by uid 500); 20 Jul 2015 08:34:20 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 44228 invoked by uid 500); 20 Jul 2015 08:34:20 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 43951 invoked by uid 99); 20 Jul 2015 08:34:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 20 Jul 2015 08:34:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 05D69DFC80; Mon, 20 Jul 2015 08:34:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jhkim@apache.org To: commits@tajo.apache.org Date: Mon, 20 Jul 2015 08:34:27 -0000 Message-Id: <546fd8477f8842fab795141626b15b51@git.apache.org> In-Reply-To: <4111af5c5a304566b7b0b8d0df45e6e1@git.apache.org> References: <4111af5c5a304566b7b0b8d0df45e6e1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/11] tajo git commit: TAJO-1397: Resource allocation should be fine grained. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index bc0d212..0ace53a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -41,8 +41,7 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.TajoMaster.MasterContext; -import org.apache.tajo.master.rm.Worker; -import org.apache.tajo.master.rm.WorkerResource; +import org.apache.tajo.master.rm.NodeStatus; import org.apache.tajo.plan.InvalidQueryException; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.PlanningException; @@ -50,6 +49,8 @@ import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.IndexScanNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; import org.apache.tajo.session.InvalidSessionException; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; @@ -420,40 +421,35 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult return tuples; } - private Tuple getQueryMasterTuple(Schema outSchema, Worker aWorker) { + private Tuple getQueryMasterTuple(Schema outSchema, NodeStatus aNodeStatus) { List columns = outSchema.getRootColumns(); Tuple aTuple = new VTuple(outSchema.size()); - WorkerResource aResource = aWorker.getResource(); - + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { Column column = columns.get(fieldId); if ("host".equalsIgnoreCase(column.getSimpleName())) { - if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) { - aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost())); + if (aNodeStatus.getConnectionInfo() != null && aNodeStatus.getConnectionInfo().getHost() != null) { + aTuple.put(fieldId, DatumFactory.createText(aNodeStatus.getConnectionInfo().getHost())); } else { aTuple.put(fieldId, DatumFactory.createNullDatum()); } } else if ("port".equalsIgnoreCase(column.getSimpleName())) { - if (aWorker.getConnectionInfo() != null) { - aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getQueryMasterPort())); + if (aNodeStatus.getConnectionInfo() != null) { + aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getConnectionInfo().getQueryMasterPort())); } else { aTuple.put(fieldId, DatumFactory.createNullDatum()); } } else if ("type".equalsIgnoreCase(column.getSimpleName())) { aTuple.put(fieldId, DatumFactory.createText("QueryMaster")); } else if ("status".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString())); - } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) { + aTuple.put(fieldId, DatumFactory.createText(aNodeStatus.getState().toString())); + } else if ("RUNNING".equalsIgnoreCase(aNodeStatus.getState().toString())) { if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumQueryMasterTasks())); - } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap())); - } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap())); + aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getNumRunningQueryMaster())); } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) { - if (aWorker.getLastHeartbeatTime() > 0) { - aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime())); + if (aNodeStatus.getLastHeartbeatTime() > 0) { + aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime())); } else { aTuple.put(fieldId, DatumFactory.createNullDatum()); } @@ -466,50 +462,48 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult return aTuple; } - private Tuple getWorkerTuple(Schema outSchema, Worker aWorker) { + private Tuple getWorkerTuple(Schema outSchema, NodeStatus aNodeStatus) { List columns = outSchema.getRootColumns(); Tuple aTuple = new VTuple(outSchema.size()); - WorkerResource aResource = aWorker.getResource(); + + NodeResource total = aNodeStatus.getTotalResourceCapability(); + NodeResource used = NodeResources.subtract(total, aNodeStatus.getAvailableResource()); for (int fieldId = 0; fieldId < columns.size(); fieldId++) { Column column = columns.get(fieldId); if ("host".equalsIgnoreCase(column.getSimpleName())) { - if (aWorker.getConnectionInfo() != null && aWorker.getConnectionInfo().getHost() != null) { - aTuple.put(fieldId, DatumFactory.createText(aWorker.getConnectionInfo().getHost())); + if (aNodeStatus.getConnectionInfo() != null && aNodeStatus.getConnectionInfo().getHost() != null) { + aTuple.put(fieldId, DatumFactory.createText(aNodeStatus.getConnectionInfo().getHost())); } else { aTuple.put(fieldId, DatumFactory.createNullDatum()); } } else if ("port".equalsIgnoreCase(column.getSimpleName())) { - if (aWorker.getConnectionInfo() != null) { - aTuple.put(fieldId, DatumFactory.createInt4(aWorker.getConnectionInfo().getPeerRpcPort())); + if (aNodeStatus.getConnectionInfo() != null) { + aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getConnectionInfo().getPeerRpcPort())); } else { aTuple.put(fieldId, DatumFactory.createNullDatum()); } } else if ("type".equalsIgnoreCase(column.getSimpleName())) { aTuple.put(fieldId, DatumFactory.createText("Worker")); } else if ("status".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(aWorker.getState().toString())); - } else if ("RUNNING".equalsIgnoreCase(aWorker.getState().toString())) { + aTuple.put(fieldId, DatumFactory.createText(aNodeStatus.getState().toString())); + } else if ("RUNNING".equalsIgnoreCase(aNodeStatus.getState().toString())) { if ("total_cpu".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(aResource.getCpuCoreSlots())); + aTuple.put(fieldId, DatumFactory.createInt4(total.getVirtualCores())); } else if ("used_mem".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt8(aResource.getUsedMemoryMB()*1048576l)); + aTuple.put(fieldId, DatumFactory.createInt8(used.getMemory() * 1048576l)); } else if ("total_mem".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMemoryMB()*1048576l)); - } else if ("free_heap".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt8(aResource.getFreeHeap())); - } else if ("max_heap".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt8(aResource.getMaxHeap())); - } else if ("used_diskslots".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getUsedDiskSlots())); - } else if ("total_diskslots".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createFloat4(aResource.getDiskSlots())); + aTuple.put(fieldId, DatumFactory.createInt8(total.getMemory() * 1048576l)); + } else if ("used_disk".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(used.getDisks())); + } else if ("total_disk".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(total.getDisks())); } else if ("running_tasks".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(aResource.getNumRunningTasks())); + aTuple.put(fieldId, DatumFactory.createInt4(aNodeStatus.getNumRunningTasks())); } else if ("last_heartbeat_ts".equalsIgnoreCase(column.getSimpleName())) { - if (aWorker.getLastHeartbeatTime() > 0) { - aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aWorker.getLastHeartbeatTime())); + if (aNodeStatus.getLastHeartbeatTime() > 0) { + aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(aNodeStatus.getLastHeartbeatTime())); } else { aTuple.put(fieldId, DatumFactory.createNullDatum()); } @@ -524,23 +518,23 @@ public class NonForwardQueryResultSystemScanner implements NonForwardQueryResult } private List getClusterInfo(Schema outSchema) { - Map workerMap = masterContext.getResourceManager().getWorkers(); + Map workerMap = masterContext.getResourceManager().getNodes(); List tuples; - List queryMasterList = new ArrayList(); - List workerList = new ArrayList(); + List queryMasterList = new ArrayList(); + List nodeStatusList = new ArrayList(); - for (Worker aWorker: workerMap.values()) { - queryMasterList.add(aWorker); - workerList.add(aWorker); + for (NodeStatus aNodeStatus : workerMap.values()) { + queryMasterList.add(aNodeStatus); + nodeStatusList.add(aNodeStatus); } - tuples = new ArrayList(queryMasterList.size() + workerList.size()); - for (Worker queryMaster: queryMasterList) { + tuples = new ArrayList(queryMasterList.size() + nodeStatusList.size()); + for (NodeStatus queryMaster: queryMasterList) { tuples.add(getQueryMasterTuple(outSchema, queryMaster)); } - for (Worker worker: workerList) { - tuples.add(getWorkerTuple(outSchema, worker)); + for (NodeStatus nodeStatus : nodeStatusList) { + tuples.add(getWorkerTuple(outSchema, nodeStatus)); } return tuples; http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEvent.java new file mode 100644 index 0000000..430dc86 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEvent.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rm; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +/** + * NodeEvent describes all kinds of events which sent to {@link NodeStatus}. + */ +public class NodeEvent extends AbstractEvent { + private final int workerId; + + public NodeEvent(int workerId, NodeEventType nodeEventType) { + super(nodeEventType); + this.workerId = workerId; + } + + public int getWorkerId() { + return workerId; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEventType.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEventType.java new file mode 100644 index 0000000..64de5d4 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeEventType.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rm; + +public enum NodeEventType { + + /** Source : {@link TajoResourceTracker}, Destination: {@link NodeStatus} */ + STARTED, + STATE_UPDATE, + RECONNECTED, + + /** Source : {@link NodeLivelinessMonitor}, Destination: {@link NodeStatus} */ + EXPIRE +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeLivelinessMonitor.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeLivelinessMonitor.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeLivelinessMonitor.java new file mode 100644 index 0000000..55a24fc --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeLivelinessMonitor.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rm; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor; +import org.apache.hadoop.yarn.util.SystemClock; +import org.apache.tajo.conf.TajoConf; + +/** + * It periodically checks the latest heartbeat time of {@link NodeStatus}. + * If the latest heartbeat time is expired, it produces EXPIRE event to a corresponding {@link NodeStatus}. + */ +public class NodeLivelinessMonitor extends AbstractLivelinessMonitor { + + private EventHandler dispatcher; + + public NodeLivelinessMonitor(Dispatcher d) { + super(NodeLivelinessMonitor.class.getSimpleName(), new SystemClock()); + this.dispatcher = d.getEventHandler(); + } + + public void serviceInit(Configuration conf) throws Exception { + Preconditions.checkArgument(conf instanceof TajoConf); + TajoConf systemConf = (TajoConf) conf; + // milliseconds + int expireIntvl = systemConf.getIntVar(TajoConf.ConfVars.RESOURCE_TRACKER_HEARTBEAT_TIMEOUT) * 1000; + setExpireInterval(expireIntvl); + setMonitorInterval(expireIntvl/3); + super.serviceInit(conf); + } + + @Override + protected void expire(Integer id) { + dispatcher.handle(new NodeEvent(id, NodeEventType.EXPIRE)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeReconnectEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeReconnectEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeReconnectEvent.java new file mode 100644 index 0000000..e9ba077 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeReconnectEvent.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rm; + +/** + * {@link TajoResourceTracker} produces this event, and it's destination is {@link NodeStatus}. + * This event occurs only when an inactive nodeStatus sends a ping again. + */ +public class NodeReconnectEvent extends NodeEvent { + private final NodeStatus nodeStatus; + public NodeReconnectEvent(int workerId, NodeStatus nodeStatus) { + super(workerId, NodeEventType.RECONNECTED); + this.nodeStatus = nodeStatus; + } + + public NodeStatus getNodeStatus() { + return nodeStatus; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeState.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeState.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeState.java new file mode 100644 index 0000000..89f601d --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeState.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rm; + +/** + * It presents the states of {@link NodeStatus}. + */ +public enum NodeState { + /** New node */ + NEW, + + /** Running node */ + RUNNING, + + /** Node is unhealthy */ + UNHEALTHY, + + /** Node is out of service */ + DECOMMISSIONED, + + /** Node has not sent a heartbeat for some configured time threshold */ + LOST; + + @SuppressWarnings("unused") + public boolean isUnusable() { + return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java new file mode 100644 index 0000000..63e4d52 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatus.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rm; + +import io.netty.util.internal.PlatformDependent; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.state.*; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.util.TUtil; + +import java.util.EnumSet; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +/** + * It contains resource and various information for a node. + */ +public class NodeStatus implements EventHandler, Comparable { + /** class logger */ + private static final Log LOG = LogFactory.getLog(NodeStatus.class); + + /** context of {@link TajoResourceManager} */ + private final TajoRMContext rmContext; + + /** last heartbeat time */ + private volatile long lastHeartbeatTime; + + @SuppressWarnings("unused") + private volatile int numRunningTasks; + + @SuppressWarnings("unused") + private volatile int numRunningQueryMaster; + + private static AtomicLongFieldUpdater HEARTBEAT_TIME_UPDATER; + private static AtomicIntegerFieldUpdater RUNNING_TASK_UPDATER; + private static AtomicIntegerFieldUpdater RUNNING_QM_UPDATER; + + static { + HEARTBEAT_TIME_UPDATER = PlatformDependent.newAtomicLongFieldUpdater(NodeStatus.class, "lastHeartbeatTime"); + if (HEARTBEAT_TIME_UPDATER == null) { + HEARTBEAT_TIME_UPDATER = AtomicLongFieldUpdater.newUpdater(NodeStatus.class, "lastHeartbeatTime"); + RUNNING_TASK_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeStatus.class, "numRunningTasks"); + RUNNING_QM_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeStatus.class, "numRunningQueryMaster"); + } else { + RUNNING_TASK_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeStatus.class, "numRunningTasks"); + RUNNING_QM_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeStatus.class, "numRunningQueryMaster"); + } + } + + /** Available resources on the node. */ + private final NodeResource availableResource; + + /** Total resources on the node. */ + private final NodeResource totalResourceCapability; + + /** Node connection information */ + private WorkerConnectionInfo connectionInfo; + + private static final ReconnectNodeTransition RECONNECT_NODE_TRANSITION = new ReconnectNodeTransition(); + private static final StatusUpdateTransition STATUS_UPDATE_TRANSITION = new StatusUpdateTransition(); + + private static final StateMachineFactory stateMachineFactory + = new StateMachineFactory(NodeState.NEW) + + // Transition from NEW + .addTransition(NodeState.NEW, NodeState.RUNNING, + NodeEventType.STARTED, + new AddNodeTransition()) + + // Transition from RUNNING + .addTransition(NodeState.RUNNING, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), + NodeEventType.STATE_UPDATE, + STATUS_UPDATE_TRANSITION) + .addTransition(NodeState.RUNNING, NodeState.LOST, + NodeEventType.EXPIRE, + new DeactivateNodeTransition(NodeState.LOST)) + .addTransition(NodeState.RUNNING, NodeState.RUNNING, + NodeEventType.RECONNECTED, + RECONNECT_NODE_TRANSITION) + + // Transitions from UNHEALTHY state + .addTransition(NodeState.UNHEALTHY, EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY), + NodeEventType.STATE_UPDATE, + STATUS_UPDATE_TRANSITION) + .addTransition(NodeState.UNHEALTHY, NodeState.LOST, + NodeEventType.EXPIRE, + new DeactivateNodeTransition(NodeState.LOST)) + .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, + NodeEventType.RECONNECTED, + RECONNECT_NODE_TRANSITION); + + private final StateMachine stateMachine = + stateMachineFactory.make(this, NodeState.NEW); + + public NodeStatus(TajoRMContext rmContext, NodeResource totalResourceCapability, WorkerConnectionInfo connectionInfo) { + this.rmContext = rmContext; + + this.connectionInfo = connectionInfo; + this.lastHeartbeatTime = System.currentTimeMillis(); + this.totalResourceCapability = totalResourceCapability; + this.availableResource = NodeResources.clone(totalResourceCapability); + } + + public int getWorkerId() { + return connectionInfo.getId(); + } + + public WorkerConnectionInfo getConnectionInfo() { + return connectionInfo; + } + + public void setLastHeartbeatTime(long lastHeartbeatTime) { + HEARTBEAT_TIME_UPDATER.lazySet(this, lastHeartbeatTime); + } + + public void setNumRunningQueryMaster(int numRunningQueryMaster) { + RUNNING_QM_UPDATER.lazySet(this, numRunningQueryMaster); + } + + public int getNumRunningQueryMaster() { + return numRunningQueryMaster; + } + + public void setNumRunningTasks(int numRunningTasks) { + RUNNING_TASK_UPDATER.lazySet(this, numRunningTasks); + } + + public int getNumRunningTasks() { + return numRunningTasks; + } + + public long getLastHeartbeatTime() { + return this.lastHeartbeatTime; + } + + /** + * + * @return the current state of node + */ + public NodeState getState() { + return this.stateMachine.getCurrentState(); + } + + /** + * Get current resources on the node. + * + * @return current resources on the node. + */ + public NodeResource getAvailableResource() { + return this.availableResource; + } + + /** + * Get total resources on the node. + * + * @return total resources on the node. + */ + public NodeResource getTotalResourceCapability() { + return totalResourceCapability; + } + + @Override + public int compareTo(NodeStatus o) { + if(o == null) { + return 1; + } + return connectionInfo.compareTo(o.connectionInfo); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + NodeStatus nodeStatus = (NodeStatus) o; + + if (connectionInfo != null ? !connectionInfo.equals(nodeStatus.connectionInfo) : nodeStatus.connectionInfo != null) + return false; + return true; + } + + @Override + public int hashCode() { + int result = 0; + result = 31 * result + (connectionInfo != null ? connectionInfo.hashCode() : 0); + return result; + } + + public static class AddNodeTransition implements SingleArcTransition { + @Override + public void transition(NodeStatus nodeStatus, NodeEvent nodeEvent) { + + nodeStatus.rmContext.getQueryMasterWorker().add(nodeStatus.getWorkerId()); + LOG.info("Node with " + nodeStatus.getTotalResourceCapability() + " is joined to Tajo cluster"); + } + } + + public static class StatusUpdateTransition implements + MultipleArcTransition { + + @Override + public NodeState transition(NodeStatus nodeStatus, NodeEvent event) { + + NodeStatusEvent statusEvent = TUtil.checkTypeAndGet(event, NodeStatusEvent.class); + nodeStatus.updateStatus(statusEvent); + + return NodeState.RUNNING; + } + } + + private void updateStatus(NodeStatusEvent statusEvent) { + setLastHeartbeatTime(System.currentTimeMillis()); + setNumRunningTasks(statusEvent.getRunningTaskNum()); + setNumRunningQueryMaster(statusEvent.getRunningQMNum()); + NodeResources.update(availableResource, statusEvent.getAvailableResource()); + + if(statusEvent.getTotalResource() != null) { + NodeResources.update(totalResourceCapability, statusEvent.getTotalResource()); + } + } + + public static class DeactivateNodeTransition implements SingleArcTransition { + private final NodeState finalState; + + public DeactivateNodeTransition(NodeState finalState) { + this.finalState = finalState; + } + + @Override + public void transition(NodeStatus nodeStatus, NodeEvent nodeEvent) { + + nodeStatus.rmContext.getNodes().remove(nodeStatus.getWorkerId()); + LOG.info("Deactivating Node " + nodeStatus.getWorkerId() + " as it is now " + finalState); + nodeStatus.rmContext.getInactiveNodes().putIfAbsent(nodeStatus.getWorkerId(), nodeStatus); + } + } + + public static class ReconnectNodeTransition implements SingleArcTransition { + + @Override + public void transition(NodeStatus nodeStatus, NodeEvent nodeEvent) { + + NodeReconnectEvent castedEvent = TUtil.checkTypeAndGet(nodeEvent, NodeReconnectEvent.class); + NodeStatus newNodeStatus = castedEvent.getNodeStatus(); + nodeStatus.rmContext.getNodes().put(castedEvent.getWorkerId(), newNodeStatus); + nodeStatus.rmContext.getDispatcher().getEventHandler().handle( + new NodeEvent(nodeStatus.getWorkerId(), NodeEventType.STARTED)); + } + } + + @Override + public void handle(NodeEvent event) { + LOG.debug("Processing " + event.getWorkerId() + " of type " + event.getType()); + NodeState oldState = getState(); + try { + stateMachine.doTransition(event.getType(), event); + } catch (InvalidStateTransitonException e) { + LOG.error("Can't handle this event at current state" + + ", eventType:" + event.getType().name() + + ", oldState:" + oldState.name() + + ", nextState:" + getState().name() + , e); + LOG.error("Invalid event " + event.getType() + " on NodeStatus " + getWorkerId()); + } + if (oldState != getState()) { + LOG.info(getWorkerId() + " Node Transitioned from " + oldState + " to " + getState()); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatusEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatusEvent.java new file mode 100644 index 0000000..db2f167 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/NodeStatusEvent.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rm; + +import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.resource.NodeResource; + +/** + * {@link TajoResourceTracker} produces this event, and its destination is + * {@link NodeStatus.StatusUpdateTransition} of {@link NodeStatus}. + */ +public class NodeStatusEvent extends NodeEvent { + private final int runningTaskNum; + private final int runningQMNum; + private final NodeResource available; + private final NodeResource total; + + public NodeStatusEvent(int workerId, int runningTaskNum, int runningQMNum, + NodeResource available, @Nullable NodeResource total) { + super(workerId, NodeEventType.STATE_UPDATE); + this.runningTaskNum = runningTaskNum; + this.runningQMNum = runningQMNum; + this.available = available; + this.total = total; + } + + public int getRunningTaskNum() { + return runningTaskNum; + } + + public int getRunningQMNum() { + return runningQMNum; + } + + public NodeResource getAvailableResource() { + return available; + } + + public NodeResource getTotalResource() { + return total; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java index bb8cc12..9dfae7e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoRMContext.java @@ -20,8 +20,6 @@ package org.apache.tajo.master.rm; import com.google.common.collect.Maps; import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.tajo.QueryId; -import org.apache.tajo.ipc.ContainerProtocol; import java.util.Collections; import java.util.Set; @@ -29,27 +27,21 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** - * It's a worker resource manager context. It contains all context data about TajoWorkerResourceManager. + * It's a worker resource manager context. It contains all context data about TajoResourceManager. */ public class TajoRMContext { final Dispatcher rmDispatcher; - /** map between workerIds and running workers */ - private final ConcurrentMap workers = Maps.newConcurrentMap(); + /** map between workerIds and running nodes */ + private final ConcurrentMap nodes = Maps.newConcurrentMap(); - /** map between workerIds and inactive workers */ - private final ConcurrentMap inactiveWorkers = Maps.newConcurrentMap(); - - /** map between queryIds and query master ContainerId */ - private final ConcurrentMap qmContainerMap = Maps - .newConcurrentMap(); + /** map between workerIds and inactive nodes */ + private final ConcurrentMap inactiveNodes = Maps.newConcurrentMap(); private final Set liveQueryMasterWorkerResources = Collections.newSetFromMap(new ConcurrentHashMap()); - private final Set stoppedQueryIds = - Collections.newSetFromMap(new ConcurrentHashMap()); public TajoRMContext(Dispatcher dispatcher) { this.rmDispatcher = dispatcher; @@ -60,32 +52,20 @@ public class TajoRMContext { } /** - * @return The Map for active workers + * @return The Map for active nodes */ - public ConcurrentMap getWorkers() { - return workers; + public ConcurrentMap getNodes() { + return nodes; } /** - * @return The Map for inactive workers + * @return The Map for inactive nodes */ - public ConcurrentMap getInactiveWorkers() { - return inactiveWorkers; - } - - /** - * - * @return The Map for query master containers - */ - public ConcurrentMap getQueryMasterContainer() { - return qmContainerMap; + public ConcurrentMap getInactiveNodes() { + return inactiveNodes; } public Set getQueryMasterWorker() { return liveQueryMasterWorkerResources; } - - public Set getStoppedQueryIds() { - return stoppedQueryIds; - } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceManager.java new file mode 100644 index 0000000..c91938a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceManager.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.rm; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.scheduler.AbstractQueryScheduler; +import org.apache.tajo.master.scheduler.QuerySchedulingInfo; +import org.apache.tajo.master.scheduler.event.SchedulerEventType; +import org.apache.tajo.util.TUtil; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +/** + * It manages all resources of tajo workers. + */ +public class TajoResourceManager extends CompositeService { + /** class logger */ + private static final Log LOG = LogFactory.getLog(TajoResourceManager.class); + + protected static final Map> SCHEDULER_CLASS_CACHE = Maps.newHashMap(); + + private TajoMaster.MasterContext masterContext; + + private TajoRMContext rmContext; + + private String queryIdSeed; + + /** + * Node Liveliness monitor + */ + private NodeLivelinessMonitor nodeLivelinessMonitor; + + private TajoConf systemConf; + private AbstractQueryScheduler scheduler; + + /** It receives status messages from workers and their resources. */ + private TajoResourceTracker resourceTracker; + + public TajoResourceManager(TajoMaster.MasterContext masterContext) { + super(TajoResourceManager.class.getSimpleName()); + this.masterContext = masterContext; + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + this.systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + + AsyncDispatcher dispatcher = new AsyncDispatcher(); + addIfService(dispatcher); + + rmContext = new TajoRMContext(dispatcher); + + this.queryIdSeed = String.valueOf(System.currentTimeMillis()); + + this.nodeLivelinessMonitor = new NodeLivelinessMonitor(this.rmContext.getDispatcher()); + addIfService(this.nodeLivelinessMonitor); + + // Register event handler for Workers + rmContext.getDispatcher().register(NodeEventType.class, new WorkerEventDispatcher(rmContext)); + + resourceTracker = new TajoResourceTracker(this, nodeLivelinessMonitor); + addIfService(resourceTracker); + + String schedulerClassName = systemConf.getVar(TajoConf.ConfVars.RESOURCE_SCHEDULER_CLASS); + scheduler = loadScheduler(schedulerClassName); + LOG.info("Loaded resource scheduler : " + scheduler.getClass()); + addIfService(scheduler); + rmContext.getDispatcher().register(SchedulerEventType.class, scheduler); + + super.serviceInit(systemConf); + } + + protected synchronized AbstractQueryScheduler loadScheduler(String schedulerClassName) throws Exception { + Class schedulerClass; + if (SCHEDULER_CLASS_CACHE.containsKey(schedulerClassName)) { + schedulerClass = SCHEDULER_CLASS_CACHE.get(schedulerClassName); + } else { + schedulerClass = (Class) Class.forName(schedulerClassName); + SCHEDULER_CLASS_CACHE.put(schedulerClassName, schedulerClass); + } + + Constructor + constructor = schedulerClass.getDeclaredConstructor(new Class[]{TajoMaster.MasterContext.class}); + constructor.setAccessible(true); + + return constructor.newInstance(new Object[]{masterContext}); + } + + @InterfaceAudience.Private + public static final class WorkerEventDispatcher implements EventHandler { + + private final TajoRMContext rmContext; + + public WorkerEventDispatcher(TajoRMContext rmContext) { + this.rmContext = rmContext; + } + + @Override + public void handle(NodeEvent event) { + int workerId = event.getWorkerId(); + NodeStatus node = this.rmContext.getNodes().get(workerId); + if (node != null) { + try { + node.handle(event); + } catch (Throwable t) { + LOG.error("Error in handling event type " + event.getType() + " for node " + workerId, t); + } + } + } + } + + + public Map getNodes() { + return ImmutableMap.copyOf(rmContext.getNodes()); + } + + public Map getInactiveNodes() { + return ImmutableMap.copyOf(rmContext.getInactiveNodes()); + } + + public Collection getQueryMasters() { + return Collections.unmodifiableSet(rmContext.getQueryMasterWorker()); + } + + @Override + public void serviceStop() throws Exception { + super.serviceStop(); + } + + /** + * + * @return The prefix of queryId. It is generated when a TajoMaster starts up. + */ + public String getSeedQueryId() throws IOException { + return queryIdSeed; + } + + @VisibleForTesting + TajoResourceTracker getResourceTracker() { + return resourceTracker; + } + + public AbstractQueryScheduler getScheduler() { + return scheduler; + } + + public void submitQuery(QuerySchedulingInfo schedulingInfo) { + scheduler.submitQuery(schedulingInfo); + } + + public TajoRMContext getRMContext() { + return rmContext; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index 2a18de7..3e3e3b4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -20,79 +20,76 @@ package org.apache.tajo.master.rm; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; -import org.apache.tajo.common.exception.NotImplementedException; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; +import org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.scheduler.event.SchedulerEvent; +import org.apache.tajo.master.scheduler.event.SchedulerEventType; +import org.apache.tajo.resource.NodeResource; import org.apache.tajo.rpc.AsyncRpcServer; import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.ProtoUtil; +import org.apache.tajo.util.TUtil; -import java.io.IOError; import java.net.InetSocketAddress; -import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat; -import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService; +import static org.apache.tajo.ResourceProtos.*; /** - * It receives pings that workers periodically send. The ping messages contains the worker resources and their statuses. - * From ping messages, {@link TajoResourceTracker} tracks the recent status of all workers. + * It receives pings that nodes periodically send. The ping messages contains the node resources and their statuses. + * From ping messages, {@link TajoResourceTracker} tracks the recent status of all nodes. * * In detail, it has two main roles as follows: * *
    *
  • Membership management for nodes which join to a Tajo cluster
  • *
      - *
    • Register - It receives the ping from a new worker. It registers the worker.
    • - *
    • Unregister - It unregisters a worker who does not send ping for some expiry time.
    • + *
    • Register - It receives the ping from a new node. It registers the node.
    • + *
    • Unregister - It unregisters a node who does not send ping for some expiry time.
    • *
        - *
      • Status Update - It updates the status of all participating workers
      • + *
      • Status Update - It updates the status of all participating nodes
      • *
      */ public class TajoResourceTracker extends AbstractService implements TajoResourceTrackerProtocolService.Interface { /** Class logger */ private Log LOG = LogFactory.getLog(TajoResourceTracker.class); - private final WorkerResourceManager manager; - /** the context of TajoWorkerResourceManager */ + private final TajoResourceManager manager; + /** the context of TajoResourceManager */ private final TajoRMContext rmContext; - /** Liveliness monitor which checks ping expiry times of workers */ - private final WorkerLivelinessMonitor workerLivelinessMonitor; + /** Liveliness monitor which checks ping expiry times of nodes */ + private final NodeLivelinessMonitor nodeLivelinessMonitor; - /** RPC server for worker resource tracker */ + /** RPC server for node resource tracker */ private AsyncRpcServer server; - /** The bind address of RPC server of worker resource tracker */ + /** The bind address of RPC server of node resource tracker */ private InetSocketAddress bindAddress; - public TajoResourceTracker(WorkerResourceManager manager, WorkerLivelinessMonitor workerLivelinessMonitor) { + /** node heartbeat interval in query running */ + private int activeInterval; + + public TajoResourceTracker(TajoResourceManager manager, NodeLivelinessMonitor nodeLivelinessMonitor) { super(TajoResourceTracker.class.getSimpleName()); this.manager = manager; this.rmContext = manager.getRMContext(); - this.workerLivelinessMonitor = workerLivelinessMonitor; + this.nodeLivelinessMonitor = nodeLivelinessMonitor; } @Override public void serviceInit(Configuration conf) throws Exception { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("Configuration must be a TajoConf instance"); - } - TajoConf systemConf = (TajoConf) conf; + + TajoConf systemConf = TUtil.checkTypeAndGet(conf, TajoConf.class); + activeInterval = systemConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_ACTIVE_INTERVAL); String confMasterServiceAddr = systemConf.getVar(TajoConf.ConfVars.RESOURCE_TRACKER_RPC_ADDRESS); InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr); - try { - server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, initIsa, 3); - } catch (Exception e) { - LOG.error(e); - throw new IOError(e); - } + int workerNum = systemConf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM); + server = new AsyncRpcServer(TajoResourceTrackerProtocol.class, this, initIsa, workerNum); server.start(); bindAddress = NetUtils.getConnectAddress(server.getListenAddress()); @@ -113,103 +110,97 @@ public class TajoResourceTracker extends AbstractService implements TajoResource super.serviceStop(); } - /** The response builder */ - private static final TajoHeartbeatResponse.Builder builder = - TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE); - - private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) { - return new WorkerStatusEvent( - workerId, - heartbeat.getServerStatus().getRunningTaskNum(), - heartbeat.getServerStatus().getJvmHeap().getMaxHeap(), - heartbeat.getServerStatus().getJvmHeap().getFreeHeap(), - heartbeat.getServerStatus().getJvmHeap().getTotalHeap()); + private static NodeStatusEvent createStatusEvent(NodeHeartbeatRequest heartbeat) { + return new NodeStatusEvent( + heartbeat.getWorkerId(), + heartbeat.getRunningTasks(), + heartbeat.getRunningQueryMasters(), + new NodeResource(heartbeat.getAvailableResource()), + heartbeat.hasTotalResource() ? new NodeResource(heartbeat.getTotalResource()) : null); } @Override - public void heartbeat( + public void nodeHeartbeat( RpcController controller, - NodeHeartbeat heartbeat, - RpcCallback done) { + NodeHeartbeatRequest heartbeat, + RpcCallback done) { + NodeHeartbeatResponse.Builder response = NodeHeartbeatResponse.newBuilder(); + ResponseCommand responseCommand = ResponseCommand.NORMAL; try { // get a workerId from the heartbeat - int workerId = heartbeat.getConnectionInfo().getId(); + int workerId = heartbeat.getWorkerId(); - if(rmContext.getWorkers().containsKey(workerId)) { // if worker is running + if(rmContext.getNodes().containsKey(workerId)) { // if node is running // status update - rmContext.getDispatcher().getEventHandler().handle(createStatusEvent(workerId, heartbeat)); + rmContext.getDispatcher().getEventHandler().handle(createStatusEvent(heartbeat)); + + //refresh scheduler resource + rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE)); + // refresh ping - workerLivelinessMonitor.receivedPing(workerId); + nodeLivelinessMonitor.receivedPing(workerId); - } else if (rmContext.getInactiveWorkers().containsKey(workerId)) { // worker was inactive + } else if (rmContext.getInactiveNodes().containsKey(workerId)) { // node was inactive + if (!heartbeat.hasConnectionInfo()) { + // request membership to worker node + responseCommand = ResponseCommand.MEMBERSHIP; + } else { - // remove the inactive worker from the list of inactive workers. - Worker worker = rmContext.getInactiveWorkers().remove(workerId); - workerLivelinessMonitor.unregister(worker.getWorkerId()); + // remove the inactive nodeStatus from the list of inactive nodes. + NodeStatus nodeStatus = rmContext.getInactiveNodes().remove(workerId); + nodeLivelinessMonitor.unregister(nodeStatus.getWorkerId()); - // create new worker instance - Worker newWorker = createWorkerResource(heartbeat); - int newWorkerId = newWorker.getWorkerId(); - // add the new worker to the list of active workers - rmContext.getWorkers().putIfAbsent(newWorkerId, newWorker); + // create new nodeStatus instance + NodeStatus newNodeStatus = createNodeStatus(heartbeat); + int newWorkerId = newNodeStatus.getWorkerId(); + // add the new nodeStatus to the list of active nodes + rmContext.getNodes().putIfAbsent(newWorkerId, newNodeStatus); - // Transit the worker to RUNNING - rmContext.getDispatcher().getEventHandler().handle(new WorkerEvent(newWorkerId, WorkerEventType.STARTED)); - // register the worker to the liveliness monitor - workerLivelinessMonitor.register(newWorkerId); + // Transit the nodeStatus to RUNNING + rmContext.getDispatcher().getEventHandler().handle(new NodeEvent(newWorkerId, NodeEventType.STARTED)); + // register the nodeStatus to the liveliness monitor + nodeLivelinessMonitor.register(newWorkerId); - } else { // if new worker pings firstly + rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE)); + } - // create new worker instance - Worker newWorker = createWorkerResource(heartbeat); - Worker oldWorker = rmContext.getWorkers().putIfAbsent(workerId, newWorker); + } else { // if new node pings firstly - if (oldWorker == null) { - // Transit the worker to RUNNING - rmContext.rmDispatcher.getEventHandler().handle(new WorkerEvent(workerId, WorkerEventType.STARTED)); + // The pings have not membership information + if (!heartbeat.hasConnectionInfo()) { + // request membership to node + responseCommand = ResponseCommand.MEMBERSHIP; } else { - LOG.info("Reconnect from the node at: " + workerId); - workerLivelinessMonitor.unregister(workerId); - rmContext.getDispatcher().getEventHandler().handle(new WorkerReconnectEvent(workerId, newWorker)); - } - workerLivelinessMonitor.register(workerId); + // create new node instance + NodeStatus newNodeStatus = createNodeStatus(heartbeat); + NodeStatus oldNodeStatus = rmContext.getNodes().putIfAbsent(workerId, newNodeStatus); + + if (oldNodeStatus == null) { + // Transit the worker to RUNNING + rmContext.rmDispatcher.getEventHandler().handle(new NodeEvent(workerId, NodeEventType.STARTED)); + } else { + LOG.info("Reconnect from the node at: " + workerId); + nodeLivelinessMonitor.unregister(workerId); + rmContext.getDispatcher().getEventHandler().handle(new NodeReconnectEvent(workerId, newNodeStatus)); + } + + nodeLivelinessMonitor.register(workerId); + rmContext.getDispatcher().getEventHandler().handle(new SchedulerEvent(SchedulerEventType.RESOURCE_UPDATE)); + } } - } finally { - builder.setClusterResourceSummary(manager.getClusterResourceSummary()); - done.run(builder.build()); + if(manager.getScheduler().getRunningQuery() > 0) { + response.setHeartBeatInterval(activeInterval); + } + done.run(response.setCommand(responseCommand).build()); } } - @Override - public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request, - RpcCallback done) { - //TODO implement with ResourceManager for scheduler - TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.Builder - response = TajoResourceTrackerProtocol.NodeHeartbeatResponseProto.newBuilder(); - done.run(response.setCommand(TajoResourceTrackerProtocol.ResponseCommand.NORMAL).build()); - } - - private Worker createWorkerResource(NodeHeartbeat request) { - WorkerResource workerResource = new WorkerResource(); - - if(request.getServerStatus() != null) { - workerResource.setMemoryMB(request.getServerStatus().getMemoryResourceMB()); - workerResource.setCpuCoreSlots(request.getServerStatus().getSystem().getAvailableProcessors()); - workerResource.setDiskSlots(request.getServerStatus().getDiskSlots()); - workerResource.setNumRunningTasks(request.getServerStatus().getRunningTaskNum()); - workerResource.setMaxHeap(request.getServerStatus().getJvmHeap().getMaxHeap()); - workerResource.setFreeHeap(request.getServerStatus().getJvmHeap().getFreeHeap()); - workerResource.setTotalHeap(request.getServerStatus().getJvmHeap().getTotalHeap()); - } else { - workerResource.setMemoryMB(4096); - workerResource.setDiskSlots(4); - workerResource.setCpuCoreSlots(4); - } - - return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo())); + private NodeStatus createNodeStatus(NodeHeartbeatRequest request) { + return new NodeStatus(rmContext, new NodeResource(request.getTotalResource()), + new WorkerConnectionInfo(request.getConnectionInfo())); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java deleted file mode 100644 index 8c5b96c..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainer.java +++ /dev/null @@ -1,125 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -import org.apache.hadoop.yarn.api.records.*; -import org.apache.tajo.master.container.TajoContainer; -import org.apache.tajo.master.container.TajoContainerId; - - -public class TajoWorkerContainer extends TajoContainer { - TajoContainerId id; - NodeId nodeId; - Worker worker; - - public Worker getWorkerResource() { - return worker; - } - - public void setWorkerResource(Worker workerResource) { - this.worker = workerResource; - } - - @Override - public TajoContainerId getId() { - return id; - } - - @Override - public void setId(TajoContainerId id) { - this.id = id; - } - - @Override - public NodeId getNodeId() { - return nodeId; - } - - @Override - public void setNodeId(NodeId nodeId) { - this.nodeId = nodeId; - } - - @Override - public String getNodeHttpAddress() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void setNodeHttpAddress(String nodeHttpAddress) { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public Resource getResource() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void setResource(Resource resource) { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public Priority getPriority() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void setPriority(Priority priority) { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public Token getContainerToken() { - return null; //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void setContainerToken(Token containerToken) { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public int compareTo(TajoContainer container) { - return getId().compareTo(container.getId()); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TajoWorkerContainer that = (TajoWorkerContainer) o; - - if (id != null ? !id.equals(that.id) : that.id != null) return false; - if (nodeId != null ? !nodeId.equals(that.nodeId) : that.nodeId != null) return false; - if (worker != null ? !worker.equals(that.worker) : that.worker != null) return false; - - return true; - } - - @Override - public int hashCode() { - int result = id != null ? id.hashCode() : 0; - result = 31 * result + (nodeId != null ? nodeId.hashCode() : 0); - result = 31 * result + (worker != null ? worker.hashCode() : 0); - return result; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/5a02873d/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java deleted file mode 100644 index 184de71..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerContainerId.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.rm; - -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.proto.YarnProtos; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.master.container.TajoContainerId; - -public class TajoWorkerContainerId extends TajoContainerId { - ApplicationAttemptId applicationAttemptId; - int id; - - @Override - public ApplicationAttemptId getApplicationAttemptId() { - return applicationAttemptId; - } - - @Override - public void setApplicationAttemptId(ApplicationAttemptId atId) { - this.applicationAttemptId = atId; - } - - @Override - public int getId() { - return id; - } - - @Override - public void setId(int id) { - this.id = id; - } - - public ContainerProtocol.TajoContainerIdProto getProto() { - YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder() - .setClusterTimestamp(applicationAttemptId.getApplicationId().getClusterTimestamp()) - .setId(applicationAttemptId.getApplicationId().getId()) - .build(); - - YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder() - .setAttemptId(applicationAttemptId.getAttemptId()) - .setApplicationId(appIdProto) - .build(); - - return ContainerProtocol.TajoContainerIdProto.newBuilder() - .setAppAttemptId(attemptIdProto) - .setAppId(appIdProto) - .setId(id) - .build(); - } - - public static ContainerProtocol.TajoContainerIdProto getContainerIdProto(TajoContainerId containerId) { - if(containerId instanceof TajoWorkerContainerId) { - return ((TajoWorkerContainerId)containerId).getProto(); - } else { - YarnProtos.ApplicationIdProto appIdProto = YarnProtos.ApplicationIdProto.newBuilder() - .setClusterTimestamp(containerId.getApplicationAttemptId().getApplicationId().getClusterTimestamp()) - .setId(containerId.getApplicationAttemptId().getApplicationId().getId()) - .build(); - - YarnProtos.ApplicationAttemptIdProto attemptIdProto = YarnProtos.ApplicationAttemptIdProto.newBuilder() - .setAttemptId(containerId.getApplicationAttemptId().getAttemptId()) - .setApplicationId(appIdProto) - .build(); - - return ContainerProtocol.TajoContainerIdProto.newBuilder() - .setAppAttemptId(attemptIdProto) - .setAppId(appIdProto) - .setId(containerId.getId()) - .build(); - } - } - - @Override - protected void build() { - - } -}