Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 954D510E30 for ; Fri, 10 Apr 2015 06:39:20 +0000 (UTC) Received: (qmail 25454 invoked by uid 500); 10 Apr 2015 06:39:20 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 25390 invoked by uid 500); 10 Apr 2015 06:39:20 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 25381 invoked by uid 99); 10 Apr 2015 06:39:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Apr 2015 06:39:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2AA2AE00C4; Fri, 10 Apr 2015 06:39:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: xgong@apache.org To: common-commits@hadoop.apache.org Message-Id: <4b158dc28a1341c9910d665d0fdfe904@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-3293. Track and display capacity scheduler health metrics in web UI. Contributed by Varun Vasudev Date: Fri, 10 Apr 2015 06:39:20 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 987c9e12e -> afa5d4715 YARN-3293. Track and display capacity scheduler health metrics in web UI. Contributed by Varun Vasudev Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/afa5d471 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/afa5d471 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/afa5d471 Branch: refs/heads/trunk Commit: afa5d4715a3aea2a6e93380b014c7bb8f0880383 Parents: 987c9e1 Author: Xuan Authored: Thu Apr 9 23:38:04 2015 -0700 Committer: Xuan Committed: Thu Apr 9 23:38:04 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/scheduler/QueueMetrics.java | 8 + .../scheduler/SchedulerHealth.java | 236 +++++++++++++ .../scheduler/capacity/CSAssignment.java | 75 ++-- .../scheduler/capacity/CapacityScheduler.java | 103 +++++- .../scheduler/capacity/LeafQueue.java | 92 +++-- .../scheduler/capacity/ParentQueue.java | 23 +- .../scheduler/common/AssignmentInformation.java | 120 +++++++ .../webapp/CapacitySchedulerPage.java | 97 ++++- .../webapp/JAXBContextResolver.java | 2 +- .../resourcemanager/webapp/RMWebServices.java | 2 +- .../webapp/dao/CapacitySchedulerHealthInfo.java | 125 +++++++ .../webapp/dao/CapacitySchedulerInfo.java | 5 +- .../scheduler/TestSchedulerHealth.java | 351 +++++++++++++++++++ .../capacity/TestCapacityScheduler.java | 4 +- .../webapp/TestRMWebServicesCapacitySched.java | 6 +- 16 files changed, 1178 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e6a1cf6..72af3ba 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -104,6 +104,9 @@ Release 2.8.0 - UNRELEASED YARN-3294. Allow dumping of Capacity Scheduler debug logs via web UI for a fixed time period. (Varun Vasudev via xgong) + YARN-3293. Track and display capacity scheduler health metrics + in web UI. (Varun Vasudev via xgong) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 58b1ed1..30c1113 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -552,4 +552,12 @@ public class QueueMetrics implements MetricsSource { public MetricsSystem getMetricsSystem() { return metricsSystem; } + + public long getAggregateAllocatedContainers() { + return aggregateContainersAllocated.value(); + } + + public long getAggegatedReleasedContainers() { + return aggregateContainersReleased.value(); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerHealth.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerHealth.java new file mode 100644 index 0000000..a987cfa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerHealth.java @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class SchedulerHealth { + + static public class DetailedInformation { + long timestamp; + NodeId nodeId; + ContainerId containerId; + String queue; + + public DetailedInformation(long timestamp, NodeId nodeId, + ContainerId containerId, String queue) { + this.timestamp = timestamp; + this.nodeId = nodeId; + this.containerId = containerId; + this.queue = queue; + } + + public long getTimestamp() { + return timestamp; + } + + public NodeId getNodeId() { + return nodeId; + } + + public ContainerId getContainerId() { + return containerId; + } + + public String getQueue() { + return queue; + } + } + + enum Operation { + ALLOCATION, RELEASE, PREEMPTION, RESERVATION, FULFILLED_RESERVATION + } + + long lastSchedulerRunTime; + Map lastSchedulerRunDetails; + Map schedulerHealthDetails; + Map schedulerOperationCounts; + // this is for counts since the RM started, never reset + Map schedulerOperationAggregateCounts; + + public SchedulerHealth() { + lastSchedulerRunDetails = new ConcurrentHashMap<>(); + schedulerHealthDetails = new ConcurrentHashMap<>(); + schedulerOperationCounts = new ConcurrentHashMap<>(); + schedulerOperationAggregateCounts = new ConcurrentHashMap<>(); + for (Operation op : Operation.values()) { + lastSchedulerRunDetails.put(op, Resource.newInstance(0, 0)); + schedulerOperationCounts.put(op, 0L); + schedulerHealthDetails.put(op, new DetailedInformation(0, null, null, + null)); + schedulerOperationAggregateCounts.put(op, 0L); + } + + } + + public void updateAllocation(long timestamp, NodeId nodeId, + ContainerId containerId, String queue) { + DetailedInformation di = + new DetailedInformation(timestamp, nodeId, containerId, queue); + schedulerHealthDetails.put(Operation.ALLOCATION, di); + } + + public void updateRelease(long timestamp, NodeId nodeId, + ContainerId containerId, String queue) { + DetailedInformation di = + new DetailedInformation(timestamp, nodeId, containerId, queue); + schedulerHealthDetails.put(Operation.RELEASE, di); + } + + public void updatePreemption(long timestamp, NodeId nodeId, + ContainerId containerId, String queue) { + DetailedInformation di = + new DetailedInformation(timestamp, nodeId, containerId, queue); + schedulerHealthDetails.put(Operation.PREEMPTION, di); + } + + public void updateReservation(long timestamp, NodeId nodeId, + ContainerId containerId, String queue) { + DetailedInformation di = + new DetailedInformation(timestamp, nodeId, containerId, queue); + schedulerHealthDetails.put(Operation.RESERVATION, di); + } + + public void updateSchedulerRunDetails(long timestamp, Resource allocated, + Resource reserved) { + lastSchedulerRunTime = timestamp; + lastSchedulerRunDetails.put(Operation.ALLOCATION, allocated); + lastSchedulerRunDetails.put(Operation.RESERVATION, reserved); + } + + public void updateSchedulerReleaseDetails(long timestamp, Resource released) { + lastSchedulerRunTime = timestamp; + lastSchedulerRunDetails.put(Operation.RELEASE, released); + } + + public void updateSchedulerReleaseCounts(long count) { + updateCounts(Operation.RELEASE, count); + } + + public void updateSchedulerAllocationCounts(long count) { + updateCounts(Operation.ALLOCATION, count); + } + + public void updateSchedulerReservationCounts(long count) { + updateCounts(Operation.RESERVATION, count); + } + + public void updateSchedulerFulfilledReservationCounts(long count) { + updateCounts(Operation.FULFILLED_RESERVATION, count); + } + + public void updateSchedulerPreemptionCounts(long count) { + updateCounts(Operation.PREEMPTION, count); + } + + private void updateCounts(Operation op, long count) { + schedulerOperationCounts.put(op, count); + Long tmp = schedulerOperationAggregateCounts.get(op); + schedulerOperationAggregateCounts.put(op, tmp + count); + } + + public long getLastSchedulerRunTime() { + return lastSchedulerRunTime; + } + + private Resource getResourceDetails(Operation op) { + return lastSchedulerRunDetails.get(op); + } + + public Resource getResourcesAllocated() { + return getResourceDetails(Operation.ALLOCATION); + } + + public Resource getResourcesReserved() { + return getResourceDetails(Operation.RESERVATION); + } + + public Resource getResourcesReleased() { + return getResourceDetails(Operation.RELEASE); + } + + private DetailedInformation getDetailedInformation(Operation op) { + return schedulerHealthDetails.get(op); + } + + public DetailedInformation getLastAllocationDetails() { + return getDetailedInformation(Operation.ALLOCATION); + } + + public DetailedInformation getLastReleaseDetails() { + return getDetailedInformation(Operation.RELEASE); + } + + public DetailedInformation getLastReservationDetails() { + return getDetailedInformation(Operation.RESERVATION); + } + + public DetailedInformation getLastPreemptionDetails() { + return getDetailedInformation(Operation.PREEMPTION); + } + + private Long getOperationCount(Operation op) { + return schedulerOperationCounts.get(op); + } + + public Long getAllocationCount() { + return getOperationCount(Operation.ALLOCATION); + } + + public Long getReleaseCount() { + return getOperationCount(Operation.RELEASE); + } + + public Long getReservationCount() { + return getOperationCount(Operation.RESERVATION); + } + + public Long getPreemptionCount() { + return getOperationCount(Operation.PREEMPTION); + } + + private Long getAggregateOperationCount(Operation op) { + return schedulerOperationAggregateCounts.get(op); + } + + public Long getAggregateAllocationCount() { + return getAggregateOperationCount(Operation.ALLOCATION); + } + + public Long getAggregateReleaseCount() { + return getAggregateOperationCount(Operation.RELEASE); + } + + public Long getAggregateReservationCount() { + return getAggregateOperationCount(Operation.RESERVATION); + } + + public Long getAggregatePreemptionCount() { + return getAggregateOperationCount(Operation.PREEMPTION); + } + + public Long getAggregateFulFilledReservationsCount() { + return getAggregateOperationCount(Operation.FULFILLED_RESERVATION); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index d5b65ba..2ba2709 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -22,40 +22,46 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.util.resource.Resources; @Private @Unstable public class CSAssignment { + final private Resource resource; private NodeType type; private final RMContainer excessReservation; private final FiCaSchedulerApp application; private final boolean skipped; - + private boolean fulfilledReservation; + private final AssignmentInformation assignmentInformation; + public CSAssignment(Resource resource, NodeType type) { - this.resource = resource; - this.type = type; - this.application = null; - this.excessReservation = null; - this.skipped = false; + this(resource, type, null, null, false, false); } - - public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) { - this.resource = excessReservation.getContainer().getResource(); - this.type = NodeType.NODE_LOCAL; - this.application = application; - this.excessReservation = excessReservation; - this.skipped = false; + + public CSAssignment(FiCaSchedulerApp application, + RMContainer excessReservation) { + this(excessReservation.getContainer().getResource(), NodeType.NODE_LOCAL, + excessReservation, application, false, false); } - + public CSAssignment(boolean skipped) { - this.resource = Resources.createResource(0, 0); - this.type = NodeType.NODE_LOCAL; - this.application = null; - this.excessReservation = null; + this(Resource.newInstance(0, 0), NodeType.NODE_LOCAL, null, null, skipped, + false); + } + + public CSAssignment(Resource resource, NodeType type, + RMContainer excessReservation, FiCaSchedulerApp application, + boolean skipped, boolean fulfilledReservation) { + this.resource = resource; + this.type = type; + this.excessReservation = excessReservation; + this.application = application; this.skipped = skipped; + this.fulfilledReservation = fulfilledReservation; + this.assignmentInformation = new AssignmentInformation(); } public Resource getResource() { @@ -84,6 +90,35 @@ public class CSAssignment { @Override public String toString() { - return resource.getMemory() + ":" + type; + String ret = "resource:" + resource.toString(); + ret += "; type:" + type; + ret += "; excessReservation:" + excessReservation; + ret += + "; applicationid:" + + (application != null ? application.getApplicationId().toString() + : "null"); + ret += "; skipped:" + skipped; + ret += "; fulfilled reservation:" + fulfilledReservation; + ret += + "; allocations(count/resource):" + + assignmentInformation.getNumAllocations() + "/" + + assignmentInformation.getAllocated().toString(); + ret += + "; reservations(count/resource):" + + assignmentInformation.getNumReservations() + "/" + + assignmentInformation.getReserved().toString(); + return ret; + } + + public void setFulfilledReservation(boolean fulfilledReservation) { + this.fulfilledReservation = fulfilledReservation; + } + + public boolean isFulfilledReservation() { + return this.fulfilledReservation; + } + + public AssignmentInformation getAssignmentInformation() { + return this.assignmentInformation; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index c86c0ff..e93c529 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; @@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -91,9 +93,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundExce import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -213,7 +217,8 @@ public class CapacityScheduler extends private boolean scheduleAsynchronously; private AsyncScheduleThread asyncSchedulerThread; private RMNodeLabelsManager labelManager; - + private SchedulerHealth schedulerHealth = new SchedulerHealth(); + long lastNodeUpdateTime; /** * EXPERT */ @@ -955,6 +960,8 @@ public class CapacityScheduler extends LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource); } + Resource releaseResources = Resource.newInstance(0, 0); + FiCaSchedulerNode node = getNode(nm.getNodeID()); List containerInfoList = nm.pullContainerUpdates(); @@ -971,13 +978,30 @@ public class CapacityScheduler extends } // Process completed containers + int releasedContainers = 0; for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); + RMContainer container = getRMContainer(containerId); LOG.debug("Container FINISHED: " + containerId); - completedContainer(getRMContainer(containerId), - completedContainer, RMContainerEventType.FINISHED); + completedContainer(container, completedContainer, + RMContainerEventType.FINISHED); + if (container != null) { + releasedContainers++; + Resource rs = container.getAllocatedResource(); + if (rs != null) { + Resources.addTo(releaseResources, rs); + } + rs = container.getReservedResource(); + if (rs != null) { + Resources.addTo(releaseResources, rs); + } + } } + schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime, + releaseResources); + schedulerHealth.updateSchedulerReleaseCounts(releasedContainers); + // Now node data structures are upto date and ready for scheduling. if(LOG.isDebugEnabled()) { LOG.debug("Node being looked for scheduling " + nm @@ -1040,11 +1064,47 @@ public class CapacityScheduler extends node.updateLabels(newLabels); } + private void updateSchedulerHealth(long now, FiCaSchedulerNode node, + CSAssignment assignment) { + + NodeId nodeId = node.getNodeID(); + List allocations = + assignment.getAssignmentInformation().getAllocationDetails(); + List reservations = + assignment.getAssignmentInformation().getReservationDetails(); + if (!allocations.isEmpty()) { + ContainerId allocatedContainerId = + allocations.get(allocations.size() - 1).containerId; + String allocatedQueue = allocations.get(allocations.size() - 1).queue; + schedulerHealth.updateAllocation(now, nodeId, allocatedContainerId, + allocatedQueue); + } + if (!reservations.isEmpty()) { + ContainerId reservedContainerId = + reservations.get(reservations.size() - 1).containerId; + String reservedQueue = reservations.get(reservations.size() - 1).queue; + schedulerHealth.updateReservation(now, nodeId, reservedContainerId, + reservedQueue); + } + schedulerHealth.updateSchedulerReservationCounts(assignment + .getAssignmentInformation().getNumReservations()); + schedulerHealth.updateSchedulerAllocationCounts(assignment + .getAssignmentInformation().getNumAllocations()); + schedulerHealth.updateSchedulerRunDetails(now, assignment + .getAssignmentInformation().getAllocated(), assignment + .getAssignmentInformation().getReserved()); + } + private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; } + // reset allocation and reservation stats before we start doing any work + updateSchedulerHealth(lastNodeUpdateTime, node, + new CSAssignment(Resources.none(), NodeType.NODE_LOCAL)); + + CSAssignment assignment; // Assign new containers... // 1. Check for reserved applications @@ -1061,15 +1121,26 @@ public class CapacityScheduler extends node.getNodeID()); LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); - CSAssignment assignment = - queue.assignContainers( + assignment = queue.assignContainers( clusterResource, node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, clusterResource))); - + if (assignment.isFulfilledReservation()) { + CSAssignment tmp = + new CSAssignment(reservedContainer.getReservedResource(), + assignment.getType()); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + reservedContainer.getReservedResource()); + tmp.getAssignmentInformation().addAllocationDetails( + reservedContainer.getContainerId(), queue.getQueuePath()); + tmp.getAssignmentInformation().incrAllocations(); + updateSchedulerHealth(lastNodeUpdateTime, node, tmp); + schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + } + RMContainer excessReservation = assignment.getExcessReservation(); if (excessReservation != null) { Container container = excessReservation.getContainer(); @@ -1092,13 +1163,14 @@ public class CapacityScheduler extends LOG.debug("Trying to schedule on node: " + node.getNodeName() + ", available: " + node.getAvailableResource()); } - root.assignContainers( + assignment = root.assignContainers( clusterResource, node, // TODO, now we only consider limits for parent for non-labeled // resources, should consider labeled resources as well. new ResourceLimits(labelManager.getResourceByLabel( RMNodeLabelsManager.NO_LABEL, clusterResource))); + updateSchedulerHealth(lastNodeUpdateTime, node, assignment); } } else { LOG.info("Skipping scheduling since node " + node.getNodeID() + @@ -1151,6 +1223,7 @@ public class CapacityScheduler extends { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; RMNode node = nodeUpdatedEvent.getRMNode(); + setLastNodeUpdateTime(Time.now()); nodeUpdate(node); if (!scheduleAsynchronously) { allocateContainersToNode(getNode(node.getNodeID())); @@ -1319,6 +1392,14 @@ public class CapacityScheduler extends LOG.info("Application attempt " + application.getApplicationAttemptId() + " released container " + container.getId() + " on node: " + node + " with event: " + event); + if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) { + schedulerHealth.updatePreemption(Time.now(), container.getNodeId(), + container.getId(), queue.getQueuePath()); + schedulerHealth.updateSchedulerPreemptionCounts(1); + } else { + schedulerHealth.updateRelease(lastNodeUpdateTime, container.getNodeId(), + container.getId(), queue.getQueuePath()); + } } @Lock(Lock.NoLock.class) @@ -1648,4 +1729,12 @@ public class CapacityScheduler extends } return ret; } + + public SchedulerHealth getSchedulerHealth() { + return this.schedulerHealth; + } + + private synchronized void setLastNodeUpdateTime(long time) { + this.lastNodeUpdateTime = time; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3e5405d..59a016f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -913,12 +913,17 @@ public class LeafQueue extends AbstractCSQueue { } // Try to assign if we have sufficient resources - assignContainersOnNode(clusterResource, node, application, priority, - rmContainer); + CSAssignment tmp = + assignContainersOnNode(clusterResource, node, application, priority, + rmContainer); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* - return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); + CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); + if (tmp.getAssignmentInformation().getNumAllocations() > 0) { + ret.setFulfilledReservation(true); + } + return ret; } protected Resource getHeadroom(User user, Resource queueCurrentLimit, @@ -1172,7 +1177,8 @@ public class LeafQueue extends AbstractCSQueue { private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer) { - Resource assigned = Resources.none(); + + CSAssignment assigned; NodeType requestType = null; MutableObject allocatedContainer = new MutableObject(); @@ -1186,14 +1192,15 @@ public class LeafQueue extends AbstractCSQueue { node, application, priority, reservedContainer, allocatedContainer); if (Resources.greaterThan(resourceCalculator, clusterResource, - assigned, Resources.none())) { + assigned.getResource(), Resources.none())) { //update locality statistics if (allocatedContainer.getValue() != null) { application.incNumAllocatedContainers(NodeType.NODE_LOCAL, requestType); } - return new CSAssignment(assigned, NodeType.NODE_LOCAL); + assigned.setType(NodeType.NODE_LOCAL); + return assigned; } } @@ -1214,14 +1221,15 @@ public class LeafQueue extends AbstractCSQueue { node, application, priority, reservedContainer, allocatedContainer); if (Resources.greaterThan(resourceCalculator, clusterResource, - assigned, Resources.none())) { + assigned.getResource(), Resources.none())) { //update locality statistics if (allocatedContainer.getValue() != null) { application.incNumAllocatedContainers(NodeType.RACK_LOCAL, requestType); } - return new CSAssignment(assigned, NodeType.RACK_LOCAL); + assigned.setType(NodeType.RACK_LOCAL); + return assigned; } } @@ -1246,7 +1254,8 @@ public class LeafQueue extends AbstractCSQueue { if (allocatedContainer.getValue() != null) { application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType); } - return new CSAssignment(assigned, NodeType.OFF_SWITCH); + assigned.setType(NodeType.OFF_SWITCH); + return assigned; } return SKIP_ASSIGNMENT; @@ -1255,10 +1264,9 @@ public class LeafQueue extends AbstractCSQueue { private Resource getMinimumResourceNeedUnreserved(Resource askedResource) { // First we need to get minimum resource we need unreserve // minimum-resource-need-unreserve = used + asked - limit - Resource minimumUnreservedResource = - Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource), - currentResourceLimits.getLimit()); - return minimumUnreservedResource; + return Resources.subtract( + Resources.add(queueUsage.getUsed(), askedResource), + currentResourceLimits.getLimit()); } @Private @@ -1334,7 +1342,7 @@ public class LeafQueue extends AbstractCSQueue { } - private Resource assignNodeLocalContainers(Resource clusterResource, + private CSAssignment assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, MutableObject allocatedContainer) { @@ -1344,11 +1352,11 @@ public class LeafQueue extends AbstractCSQueue { nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, allocatedContainer); } - - return Resources.none(); + + return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); } - private Resource assignRackLocalContainers(Resource clusterResource, + private CSAssignment assignRackLocalContainers(Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, MutableObject allocatedContainer) { @@ -1358,11 +1366,11 @@ public class LeafQueue extends AbstractCSQueue { rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, allocatedContainer); } - - return Resources.none(); + + return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL); } - private Resource assignOffSwitchContainers(Resource clusterResource, + private CSAssignment assignOffSwitchContainers(Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, MutableObject allocatedContainer) { @@ -1373,7 +1381,7 @@ public class LeafQueue extends AbstractCSQueue { allocatedContainer); } - return Resources.none(); + return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH); } boolean canAssign(FiCaSchedulerApp application, Priority priority, @@ -1443,15 +1451,13 @@ public class LeafQueue extends AbstractCSQueue { .getApplicationAttemptId(), application.getNewContainerId()); // Create the container - Container container = - BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() - .getHttpAddress(), capability, priority, null); - - return container; + return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode() + .getHttpAddress(), capability, priority, null); + } - private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node, + private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, MutableObject createdContainer) { @@ -1472,7 +1478,7 @@ public class LeafQueue extends AbstractCSQueue { if (rmContainer != null) { unreserve(application, priority, node, rmContainer); } - return Resources.none(); + return new CSAssignment(Resources.none(), type); } Resource capability = request.getCapability(); @@ -1484,7 +1490,7 @@ public class LeafQueue extends AbstractCSQueue { LOG.warn("Node : " + node.getNodeID() + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); - return Resources.none(); + return new CSAssignment(Resources.none(), type); } assert Resources.greaterThan( @@ -1497,7 +1503,7 @@ public class LeafQueue extends AbstractCSQueue { // something went wrong getting/creating the container if (container == null) { LOG.warn("Couldn't get container for allocation!"); - return Resources.none(); + return new CSAssignment(Resources.none(), type); } boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( @@ -1529,7 +1535,7 @@ public class LeafQueue extends AbstractCSQueue { // container (That means we *have to* unreserve some resource to // continue)). If we failed to unreserve some resource, if (!containerUnreserved) { - return Resources.none(); + return new CSAssignment(Resources.none(), type); } } } @@ -1540,7 +1546,7 @@ public class LeafQueue extends AbstractCSQueue { // Does the application need this resource? if (allocatedContainer == null) { - return Resources.none(); + return new CSAssignment(Resources.none(), type); } // Inform the node @@ -1552,7 +1558,13 @@ public class LeafQueue extends AbstractCSQueue { " queue=" + this + " clusterResource=" + clusterResource); createdContainer.setValue(allocatedContainer); - return container.getResource(); + CSAssignment assignment = new CSAssignment(container.getResource(), type); + assignment.getAssignmentInformation().addAllocationDetails( + container.getId(), getQueuePath()); + assignment.getAssignmentInformation().incrAllocations(); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + container.getResource()); + return assignment; } else { // if we are allowed to allocate but this node doesn't have space, reserve it or // if this was an already a reserved container, reserve it again @@ -1566,7 +1578,7 @@ public class LeafQueue extends AbstractCSQueue { // reserve the new container if (!checkLimitsToReserve(clusterResource, application, capability)) { - return Resources.none(); + return new CSAssignment(Resources.none(), type); } } @@ -1581,10 +1593,16 @@ public class LeafQueue extends AbstractCSQueue { " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" + clusterResource); - - return request.getCapability(); + CSAssignment assignment = + new CSAssignment(request.getCapability(), type); + assignment.getAssignmentInformation().addReservationDetails( + container.getId(), getQueuePath()); + assignment.getAssignmentInformation().incrReservations(); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + request.getCapability()); + return assignment; } - return Resources.none(); + return new CSAssignment(Resources.none(), type); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 5ed6bb8..882498a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -415,7 +416,27 @@ public class ParentQueue extends AbstractCSQueue { nodeLabels); // Track resource utilization in this pass of the scheduler - Resources.addTo(assignment.getResource(), assignedToChild.getResource()); + Resources + .addTo(assignment.getResource(), assignedToChild.getResource()); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + assignedToChild.getAssignmentInformation().getAllocated()); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + assignedToChild.getAssignmentInformation().getReserved()); + assignment.getAssignmentInformation().incrAllocations( + assignedToChild.getAssignmentInformation().getNumAllocations()); + assignment.getAssignmentInformation().incrReservations( + assignedToChild.getAssignmentInformation().getNumReservations()); + assignment + .getAssignmentInformation() + .getAllocationDetails() + .addAll( + assignedToChild.getAssignmentInformation().getAllocationDetails()); + assignment + .getAssignmentInformation() + .getReservationDetails() + .addAll( + assignedToChild.getAssignmentInformation() + .getReservationDetails()); LOG.info("assignedContainer" + " queue=" + getQueueName() + http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java new file mode 100644 index 0000000..c5c067d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class AssignmentInformation { + + public enum Operation { + ALLOCATION, RESERVATION + } + + public static class AssignmentDetails { + public ContainerId containerId; + public String queue; + + public AssignmentDetails(ContainerId containerId, String queue) { + this.containerId = containerId; + this.queue = queue; + } + } + + private final Map operationCounts; + private final Map operationResources; + private final Map> operationDetails; + + public AssignmentInformation() { + this.operationCounts = new HashMap<>(); + this.operationResources = new HashMap<>(); + this.operationDetails = new HashMap<>(); + for (Operation op : Operation.values()) { + operationCounts.put(op, 0); + operationResources.put(op, Resource.newInstance(0, 0)); + operationDetails.put(op, new ArrayList()); + } + } + + public int getNumAllocations() { + return operationCounts.get(Operation.ALLOCATION); + } + + public void incrAllocations() { + increment(Operation.ALLOCATION, 1); + } + + public void incrAllocations(int by) { + increment(Operation.ALLOCATION, by); + } + + public int getNumReservations() { + return operationCounts.get(Operation.RESERVATION); + } + + public void incrReservations() { + increment(Operation.RESERVATION, 1); + } + + public void incrReservations(int by) { + increment(Operation.RESERVATION, by); + } + + private void increment(Operation op, int by) { + operationCounts.put(op, operationCounts.get(op) + by); + } + + public Resource getAllocated() { + return operationResources.get(Operation.ALLOCATION); + } + + public Resource getReserved() { + return operationResources.get(Operation.RESERVATION); + } + + private void addAssignmentDetails(Operation op, ContainerId containerId, + String queue) { + operationDetails.get(op).add(new AssignmentDetails(containerId, queue)); + } + + public void addAllocationDetails(ContainerId containerId, String queue) { + addAssignmentDetails(Operation.ALLOCATION, containerId, queue); + } + + public void addReservationDetails(ContainerId containerId, String queue) { + addAssignmentDetails(Operation.RESERVATION, containerId, queue); + } + + public List getAllocationDetails() { + return operationDetails.get(Operation.ALLOCATION); + } + + public List getReservationDetails() { + return operationDetails.get(Operation.RESERVATION); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java index f1e1e8c..6c34d99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java @@ -21,17 +21,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp; import static org.apache.hadoop.yarn.util.StringHelper.join; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo; -import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo; -import org.apache.hadoop.yarn.server.webapp.AppsBlock; +import org.apache.hadoop.yarn.util.Times; import org.apache.hadoop.yarn.webapp.ResponseInfo; import org.apache.hadoop.yarn.webapp.SubView; import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; @@ -244,7 +246,7 @@ class CapacitySchedulerPage extends RmView { span(".q", "default")._()._(); } else { CSQueue root = cs.getRootQueue(); - CapacitySchedulerInfo sinfo = new CapacitySchedulerInfo(root); + CapacitySchedulerInfo sinfo = new CapacitySchedulerInfo(root, cs); csqinfo.csinfo = sinfo; csqinfo.qinfo = null; @@ -274,6 +276,95 @@ class CapacitySchedulerPage extends RmView { script().$type("text/javascript"). _("$('#cs').hide();")._()._(). _(RMAppsBlock.class); + html._(HealthBlock.class); + } + } + + public static class HealthBlock extends HtmlBlock { + + final CapacityScheduler cs; + + @Inject + HealthBlock(ResourceManager rm) { + cs = (CapacityScheduler) rm.getResourceScheduler(); + } + + @Override + public void render(HtmlBlock.Block html) { + SchedulerHealth healthInfo = cs.getSchedulerHealth(); + DIV div = html.div("#health"); + div.h4("Aggregate scheduler counts"); + TBODY>> tbody = + div.table("#lastrun").thead().$class("ui-widget-header").tr().th() + .$class("ui-state-default")._("Total Container Allocations(count)") + ._().th().$class("ui-state-default") + ._("Total Container Releases(count)")._().th() + .$class("ui-state-default") + ._("Total Fulfilled Reservations(count)")._().th() + .$class("ui-state-default")._("Total Container Preemptions(count)") + ._()._()._().tbody(); + tbody + .$class("ui-widget-content") + .tr() + .td( + String.valueOf(cs.getRootQueueMetrics() + .getAggregateAllocatedContainers())) + .td( + String.valueOf(cs.getRootQueueMetrics() + .getAggegatedReleasedContainers())) + .td(healthInfo.getAggregateFulFilledReservationsCount().toString()) + .td(healthInfo.getAggregatePreemptionCount().toString())._()._()._(); + div.h4("Last scheduler run"); + tbody = + div.table("#lastrun").thead().$class("ui-widget-header").tr().th() + .$class("ui-state-default")._("Time")._().th() + .$class("ui-state-default")._("Allocations(count - resources)")._() + .th().$class("ui-state-default")._("Reservations(count - resources)") + ._().th().$class("ui-state-default")._("Releases(count - resources)") + ._()._()._().tbody(); + tbody + .$class("ui-widget-content") + .tr() + .td(Times.format(healthInfo.getLastSchedulerRunTime())) + .td( + healthInfo.getAllocationCount().toString() + " - " + + healthInfo.getResourcesAllocated().toString()) + .td( + healthInfo.getReservationCount().toString() + " - " + + healthInfo.getResourcesReserved().toString()) + .td( + healthInfo.getReleaseCount().toString() + " - " + + healthInfo.getResourcesReleased().toString())._()._()._(); + Map info = new HashMap<>(); + info.put("Allocation", healthInfo.getLastAllocationDetails()); + info.put("Reservation", healthInfo.getLastReservationDetails()); + info.put("Release", healthInfo.getLastReleaseDetails()); + info.put("Preemption", healthInfo.getLastPreemptionDetails()); + + for (Map.Entry entry : info + .entrySet()) { + String containerId = "N/A"; + String nodeId = "N/A"; + String queue = "N/A"; + String table = "#" + entry.getKey(); + div.h4("Last " + entry.getKey()); + tbody = + div.table(table).thead().$class("ui-widget-header").tr().th() + .$class("ui-state-default")._("Time")._().th() + .$class("ui-state-default")._("Container Id")._().th() + .$class("ui-state-default")._("Node Id")._().th() + .$class("ui-state-default")._("Queue")._()._()._().tbody(); + SchedulerHealth.DetailedInformation di = entry.getValue(); + if (di.getTimestamp() != 0) { + containerId = di.getContainerId().toString(); + nodeId = di.getNodeId().toString(); + queue = di.getQueue(); + } + tbody.$class("ui-widget-content").tr() + .td(Times.format(di.getTimestamp())).td(containerId).td(nodeId) + .td(queue)._()._()._(); + } + div._(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java index 8d901b1..5f5e1ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java @@ -53,7 +53,7 @@ public class JAXBContextResolver implements ContextResolver { NodesInfo.class, RemoteExceptionData.class, CapacitySchedulerQueueInfoList.class, ResourceInfo.class, UsersInfo.class, UserInfo.class, ApplicationStatisticsInfo.class, - StatisticsItemInfo.class }; + StatisticsItemInfo.class, CapacitySchedulerHealthInfo.class }; // these dao classes need root unwrapping final Class[] rootUnwrappedTypes = { NewApplication.class, ApplicationSubmissionContextInfo.class, http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 584da7d..8c63d27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -228,7 +228,7 @@ public class RMWebServices { if (rs instanceof CapacityScheduler) { CapacityScheduler cs = (CapacityScheduler) rs; CSQueue root = cs.getRootQueue(); - sinfo = new CapacitySchedulerInfo(root); + sinfo = new CapacitySchedulerInfo(root, cs); } else if (rs instanceof FairScheduler) { FairScheduler fs = (FairScheduler) rs; sinfo = new FairSchedulerInfo(fs); http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerHealthInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerHealthInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerHealthInfo.java new file mode 100644 index 0000000..652c91b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerHealthInfo.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; + +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@XmlAccessorType(XmlAccessType.FIELD) +public class CapacitySchedulerHealthInfo { + + @XmlAccessorType(XmlAccessType.FIELD) + public static class OperationInformation { + String nodeId; + String containerId; + String queue; + + OperationInformation() { + } + + OperationInformation(SchedulerHealth.DetailedInformation di) { + this.nodeId = di.getNodeId() == null ? "N/A" : di.getNodeId().toString(); + this.containerId = + di.getContainerId() == null ? "N/A" : di.getContainerId().toString(); + this.queue = di.getQueue() == null ? "N/A" : di.getQueue(); + } + + public String getNodeId() { + return nodeId; + } + + public String getContainerId() { + return containerId; + } + + public String getQueue() { + return queue; + } + } + + @XmlAccessorType(XmlAccessType.FIELD) + public static class LastRunDetails { + String operation; + long count; + ResourceInfo resources; + + LastRunDetails() { + } + + LastRunDetails(String operation, long count, Resource resource) { + this.operation = operation; + this.count = count; + this.resources = new ResourceInfo(resource); + } + + public String getOperation() { + return operation; + } + + public long getCount() { + return count; + } + + public ResourceInfo getResources() { + return resources; + } + } + + long lastrun; + Map operationsInfo; + List lastRunDetails; + + CapacitySchedulerHealthInfo() { + } + + public long getLastrun() { + return lastrun; + } + + CapacitySchedulerHealthInfo(CapacityScheduler cs) { + SchedulerHealth ht = cs.getSchedulerHealth(); + lastrun = ht.getLastSchedulerRunTime(); + operationsInfo = new HashMap<>(); + operationsInfo.put("last-allocation", + new OperationInformation(ht.getLastAllocationDetails())); + operationsInfo.put("last-release", + new OperationInformation(ht.getLastReleaseDetails())); + operationsInfo.put("last-preemption", + new OperationInformation(ht.getLastPreemptionDetails())); + operationsInfo.put("last-reservation", + new OperationInformation(ht.getLastReservationDetails())); + + lastRunDetails = new ArrayList<>(); + lastRunDetails.add(new LastRunDetails("releases", ht.getReleaseCount(), ht + .getResourcesReleased())); + lastRunDetails.add(new LastRunDetails("allocations", ht + .getAllocationCount(), ht.getResourcesAllocated())); + lastRunDetails.add(new LastRunDetails("reservations", ht + .getReservationCount(), ht.getResourcesReserved())); + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java index 5f32342..9901878 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java @@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlTransient; import javax.xml.bind.annotation.XmlType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; @XmlRootElement(name = "capacityScheduler") @@ -37,6 +38,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo { protected float maxCapacity; protected String queueName; protected CapacitySchedulerQueueInfoList queues; + protected CapacitySchedulerHealthInfo health; @XmlTransient static final float EPSILON = 1e-8f; @@ -44,7 +46,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo { public CapacitySchedulerInfo() { } // JAXB needs this - public CapacitySchedulerInfo(CSQueue parent) { + public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) { this.queueName = parent.getQueueName(); this.usedCapacity = parent.getUsedCapacity() * 100; this.capacity = parent.getCapacity() * 100; @@ -54,6 +56,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo { this.maxCapacity = max * 100; queues = getQueues(parent); + health = new CapacitySchedulerHealthInfo(cs); } public float getCapacity() { http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java new file mode 100644 index 0000000..df35485 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java @@ -0,0 +1,351 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.NodeManager; +import org.apache.hadoop.yarn.server.resourcemanager.Application; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.Task; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assume.assumeTrue; + +public class TestSchedulerHealth { + + private ResourceManager resourceManager; + + public void setup() { + resourceManager = new ResourceManager() { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(getConfig()); + return mgr; + } + }; + + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + resourceManager.init(conf); + resourceManager.getRMContext().getContainerTokenSecretManager() + .rollMasterKey(); + resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey(); + ((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start(); + } + + @Test + public void testCounts() { + + SchedulerHealth sh = new SchedulerHealth(); + int value = 1; + for (int i = 0; i < 2; ++i) { + sh.updateSchedulerPreemptionCounts(value); + sh.updateSchedulerAllocationCounts(value); + sh.updateSchedulerReservationCounts(value); + sh.updateSchedulerReleaseCounts(value); + + Assert.assertEquals(value, sh.getAllocationCount().longValue()); + Assert.assertEquals(value, sh.getReleaseCount().longValue()); + Assert.assertEquals(value, sh.getReservationCount().longValue()); + Assert.assertEquals(value, sh.getPreemptionCount().longValue()); + + Assert.assertEquals(value * (i + 1), sh.getAggregateAllocationCount() + .longValue()); + Assert.assertEquals(value * (i + 1), sh.getAggregateReleaseCount() + .longValue()); + Assert.assertEquals(value * (i + 1), sh.getAggregateReservationCount() + .longValue()); + Assert.assertEquals(value * (i + 1), sh.getAggregatePreemptionCount() + .longValue()); + + } + } + + @Test + public void testOperationDetails() { + + SchedulerHealth sh = new SchedulerHealth(); + long now = Time.now(); + sh.updateRelease(now, NodeId.newInstance("testhost", 1234), + ContainerId.fromString("container_1427562107907_0002_01_000001"), + "testqueue"); + Assert.assertEquals("container_1427562107907_0002_01_000001", sh + .getLastReleaseDetails().getContainerId().toString()); + Assert.assertEquals("testhost:1234", sh.getLastReleaseDetails().getNodeId() + .toString()); + Assert.assertEquals("testqueue", sh.getLastReleaseDetails().getQueue()); + Assert.assertEquals(now, sh.getLastReleaseDetails().getTimestamp()); + Assert.assertEquals(0, sh.getLastSchedulerRunTime()); + + now = Time.now(); + sh.updateReservation(now, NodeId.newInstance("testhost1", 1234), + ContainerId.fromString("container_1427562107907_0003_01_000001"), + "testqueue1"); + Assert.assertEquals("container_1427562107907_0003_01_000001", sh + .getLastReservationDetails().getContainerId().toString()); + Assert.assertEquals("testhost1:1234", sh.getLastReservationDetails() + .getNodeId().toString()); + Assert + .assertEquals("testqueue1", sh.getLastReservationDetails().getQueue()); + Assert.assertEquals(now, sh.getLastReservationDetails().getTimestamp()); + Assert.assertEquals(0, sh.getLastSchedulerRunTime()); + + now = Time.now(); + sh.updateAllocation(now, NodeId.newInstance("testhost2", 1234), + ContainerId.fromString("container_1427562107907_0004_01_000001"), + "testqueue2"); + Assert.assertEquals("container_1427562107907_0004_01_000001", sh + .getLastAllocationDetails().getContainerId().toString()); + Assert.assertEquals("testhost2:1234", sh.getLastAllocationDetails() + .getNodeId().toString()); + Assert.assertEquals("testqueue2", sh.getLastAllocationDetails().getQueue()); + Assert.assertEquals(now, sh.getLastAllocationDetails().getTimestamp()); + Assert.assertEquals(0, sh.getLastSchedulerRunTime()); + + now = Time.now(); + sh.updatePreemption(now, NodeId.newInstance("testhost3", 1234), + ContainerId.fromString("container_1427562107907_0005_01_000001"), + "testqueue3"); + Assert.assertEquals("container_1427562107907_0005_01_000001", sh + .getLastPreemptionDetails().getContainerId().toString()); + Assert.assertEquals("testhost3:1234", sh.getLastPreemptionDetails() + .getNodeId().toString()); + Assert.assertEquals("testqueue3", sh.getLastPreemptionDetails().getQueue()); + Assert.assertEquals(now, sh.getLastPreemptionDetails().getTimestamp()); + Assert.assertEquals(0, sh.getLastSchedulerRunTime()); + } + + @Test + public void testResourceUpdate() { + SchedulerHealth sh = new SchedulerHealth(); + long now = Time.now(); + sh.updateSchedulerRunDetails(now, Resource.newInstance(1024, 1), + Resource.newInstance(2048, 1)); + Assert.assertEquals(now, sh.getLastSchedulerRunTime()); + Assert.assertEquals(Resource.newInstance(1024, 1), + sh.getResourcesAllocated()); + Assert.assertEquals(Resource.newInstance(2048, 1), + sh.getResourcesReserved()); + now = Time.now(); + sh.updateSchedulerReleaseDetails(now, Resource.newInstance(3072, 1)); + Assert.assertEquals(now, sh.getLastSchedulerRunTime()); + Assert.assertEquals(Resource.newInstance(3072, 1), + sh.getResourcesReleased()); + } + + private NodeManager registerNode(String hostName, int containerManagerPort, + int httpPort, String rackName, Resource capability) throws IOException, + YarnException { + NodeManager nm = + new NodeManager(hostName, containerManagerPort, httpPort, rackName, + capability, resourceManager); + NodeAddedSchedulerEvent nodeAddEvent1 = + new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes() + .get(nm.getNodeId())); + resourceManager.getResourceScheduler().handle(nodeAddEvent1); + return nm; + } + + private void nodeUpdate(NodeManager nm) { + RMNode node = + resourceManager.getRMContext().getRMNodes().get(nm.getNodeId()); + // Send a heartbeat to kick the tires on the Scheduler + NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node); + resourceManager.getResourceScheduler().handle(nodeUpdate); + } + + @Test + public void testCapacitySchedulerAllocation() throws Exception { + + setup(); + + boolean isCapacityScheduler = + resourceManager.getResourceScheduler() instanceof CapacityScheduler; + assumeTrue("This test is only supported on Capacity Scheduler", + isCapacityScheduler); + + // Register node1 + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * 1024, 1)); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit an application + Application application_0 = + new Application("user_0", "default", resourceManager); + application_0.submit(); + + application_0.addNodeManager(host_0, 1234, nm_0); + + Resource capability_0_0 = Resources.createResource(1024, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * 1024, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0 }); + application_0.addTask(task_0_0); + Task task_0_1 = + new Task(application_0, priority_0, new String[] { host_0 }); + application_0.addTask(task_0_1); + + // Send resource requests to the scheduler + application_0.schedule(); + + // Send a heartbeat to kick the tires on the Scheduler + nodeUpdate(nm_0); + SchedulerHealth sh = + ((CapacityScheduler) resourceManager.getResourceScheduler()) + .getSchedulerHealth(); + Assert.assertEquals(2, sh.getAllocationCount().longValue()); + Assert.assertEquals(Resource.newInstance(3 * 1024, 2), + sh.getResourcesAllocated()); + Assert.assertEquals(2, sh.getAggregateAllocationCount().longValue()); + Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails() + .getNodeId().toString()); + Assert.assertEquals("root.default", sh.getLastAllocationDetails() + .getQueue()); + + Task task_0_2 = + new Task(application_0, priority_0, new String[] { host_0 }); + application_0.addTask(task_0_2); + application_0.schedule(); + + nodeUpdate(nm_0); + Assert.assertEquals(1, sh.getAllocationCount().longValue()); + Assert.assertEquals(Resource.newInstance(2 * 1024, 1), + sh.getResourcesAllocated()); + Assert.assertEquals(3, sh.getAggregateAllocationCount().longValue()); + Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails() + .getNodeId().toString()); + Assert.assertEquals("root.default", sh.getLastAllocationDetails() + .getQueue()); + } + + @Test + public void testCapacitySchedulerReservation() throws Exception { + + setup(); + + boolean isCapacityScheduler = + resourceManager.getResourceScheduler() instanceof CapacityScheduler; + assumeTrue("This test is only supported on Capacity Scheduler", + isCapacityScheduler); + + // Register nodes + String host_0 = "host_0"; + NodeManager nm_0 = + registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(2 * 1024, 1)); + String host_1 = "host_1"; + NodeManager nm_1 = + registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK, + Resources.createResource(5 * 1024, 1)); + nodeUpdate(nm_0); + nodeUpdate(nm_1); + + // ResourceRequest priorities + Priority priority_0 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(0); + Priority priority_1 = + org.apache.hadoop.yarn.server.resourcemanager.resource.Priority + .create(1); + + // Submit an application + Application application_0 = + new Application("user_0", "default", resourceManager); + application_0.submit(); + + application_0.addNodeManager(host_0, 1234, nm_0); + application_0.addNodeManager(host_1, 1234, nm_1); + + Resource capability_0_0 = Resources.createResource(1024, 1); + application_0.addResourceRequestSpec(priority_1, capability_0_0); + + Resource capability_0_1 = Resources.createResource(2 * 1024, 1); + application_0.addResourceRequestSpec(priority_0, capability_0_1); + + Task task_0_0 = + new Task(application_0, priority_1, new String[] { host_0 }); + application_0.addTask(task_0_0); + + // Send resource requests to the scheduler + application_0.schedule(); + + // Send a heartbeat to kick the tires on the Scheduler + nodeUpdate(nm_0); + SchedulerHealth sh = + ((CapacityScheduler) resourceManager.getResourceScheduler()) + .getSchedulerHealth(); + Assert.assertEquals(1, sh.getAllocationCount().longValue()); + Assert.assertEquals(Resource.newInstance(1024, 1), + sh.getResourcesAllocated()); + Assert.assertEquals(1, sh.getAggregateAllocationCount().longValue()); + Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails() + .getNodeId().toString()); + Assert.assertEquals("root.default", sh.getLastAllocationDetails() + .getQueue()); + + Task task_0_1 = + new Task(application_0, priority_0, new String[] { host_0 }); + application_0.addTask(task_0_1); + application_0.schedule(); + + nodeUpdate(nm_0); + Assert.assertEquals(0, sh.getAllocationCount().longValue()); + Assert.assertEquals(1, sh.getReservationCount().longValue()); + Assert.assertEquals(Resource.newInstance(2 * 1024, 1), + sh.getResourcesReserved()); + Assert.assertEquals(1, sh.getAggregateAllocationCount().longValue()); + Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails() + .getNodeId().toString()); + Assert.assertEquals("root.default", sh.getLastAllocationDetails() + .getQueue()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index aaa615d..4aa818b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1660,7 +1660,7 @@ public class TestCapacityScheduler { CapacityScheduler cs = (CapacityScheduler) resourceManager.getResourceScheduler(); CSQueue origRootQ = cs.getRootQueue(); - CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ); + CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ, cs); int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues()); int origNumAppsRoot = origRootQ.getNumApplications(); @@ -1669,7 +1669,7 @@ public class TestCapacityScheduler { CSQueue newRootQ = cs.getRootQueue(); int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues()); int newNumAppsRoot = newRootQ.getNumApplications(); - CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ); + CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ, cs); CapacitySchedulerLeafQueueInfo origOldA1 = (CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues()); CapacitySchedulerLeafQueueInfo origNewA1 = http://git-wip-us.apache.org/repos/asf/hadoop/blob/afa5d471/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java index eb42679..e028d31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.StringReader; @@ -314,11 +315,14 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase { JSONObject info = json.getJSONObject("scheduler"); assertEquals("incorrect number of elements", 1, info.length()); info = info.getJSONObject("schedulerInfo"); - assertEquals("incorrect number of elements", 6, info.length()); + assertEquals("incorrect number of elements", 7, info.length()); verifyClusterSchedulerGeneric(info.getString("type"), (float) info.getDouble("usedCapacity"), (float) info.getDouble("capacity"), (float) info.getDouble("maxCapacity"), info.getString("queueName")); + JSONObject health = info.getJSONObject("health"); + assertNotNull(health); + assertEquals("incorrect number of elements", 3, health.length()); JSONArray arr = info.getJSONObject("queues").getJSONArray("queue"); assertEquals("incorrect number of elements", 2, arr.length());