Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 69F99200B8F for ; Thu, 15 Sep 2016 23:50:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 68836160ABA; Thu, 15 Sep 2016 21:50:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 40129160AC6 for ; Thu, 15 Sep 2016 23:50:32 +0200 (CEST) Received: (qmail 12745 invoked by uid 500); 15 Sep 2016 21:50:31 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 12610 invoked by uid 99); 15 Sep 2016 21:50:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Sep 2016 21:50:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 00242E08AD; Thu, 15 Sep 2016 21:50:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: wangda@apache.org To: common-commits@hadoop.apache.org Date: Thu, 15 Sep 2016 21:50:31 -0000 Message-Id: In-Reply-To: <02bd0474bc5e464d94d2e3b6dc9da5ea@git.apache.org> References: <02bd0474bc5e464d94d2e3b6dc9da5ea@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hadoop git commit: YARN-4091. Add REST API to retrieve scheduler activity. (Chen Ge via wangda) archived-at: Thu, 15 Sep 2016 21:50:34 -0000 YARN-4091. Add REST API to retrieve scheduler activity. (Chen Ge via wangda) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ee101e49 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ee101e49 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ee101e49 Branch: refs/heads/branch-2 Commit: ee101e49fadf55b90e583800eea1a736c9fe1d54 Parents: 16aecc6 Author: Wangda Tan Authored: Thu Sep 15 14:49:03 2016 -0700 Committer: Wangda Tan Committed: Thu Sep 15 14:49:03 2016 -0700 ---------------------------------------------------------------------- .../dev-support/findbugs-exclude.xml | 12 + .../scheduler/AbstractYarnScheduler.java | 8 + .../scheduler/activities/ActivitiesLogger.java | 275 +++++++ .../scheduler/activities/ActivitiesManager.java | 320 ++++++++ .../activities/ActivityDiagnosticConstant.java | 77 ++ .../scheduler/activities/ActivityNode.java | 110 +++ .../scheduler/activities/ActivityState.java | 37 + .../activities/AllocationActivity.java | 74 ++ .../scheduler/activities/AllocationState.java | 35 + .../scheduler/activities/AppAllocation.java | 107 +++ .../scheduler/activities/NodeAllocation.java | 140 ++++ .../scheduler/capacity/AbstractCSQueue.java | 6 +- .../scheduler/capacity/CapacityScheduler.java | 39 +- .../capacity/CapacitySchedulerContext.java | 3 + .../scheduler/capacity/LeafQueue.java | 53 +- .../scheduler/capacity/ParentQueue.java | 91 ++- .../allocator/AbstractContainerAllocator.java | 51 +- .../capacity/allocator/ContainerAllocator.java | 10 +- .../allocator/RegularContainerAllocator.java | 97 ++- .../scheduler/common/fica/FiCaSchedulerApp.java | 33 +- .../resourcemanager/webapp/RMWebServices.java | 123 +++ .../webapp/dao/ActivitiesInfo.java | 80 ++ .../webapp/dao/ActivityNodeInfo.java | 67 ++ .../webapp/dao/AppActivitiesInfo.java | 79 ++ .../webapp/dao/AppAllocationInfo.java | 72 ++ .../webapp/dao/NodeAllocationInfo.java | 51 ++ .../webapp/TestRMWebServicesCapacitySched.java | 42 +- .../TestRMWebServicesSchedulerActivities.java | 777 +++++++++++++++++++ 28 files changed, 2792 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 6998d75..68d3662 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -502,6 +502,7 @@ + @@ -525,4 +526,15 @@ +   +     +     +       +       +       +       + +     +     +   http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index b8032ac..45415de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity .LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; @@ -97,6 +98,8 @@ public abstract class AbstractYarnScheduler private volatile Priority maxClusterLevelAppPriority; + protected ActivitiesManager activitiesManager; + /* * All schedulers which are inheriting AbstractYarnScheduler should use * concurrent version of 'applications' map. @@ -790,4 +793,9 @@ public abstract class AbstractYarnScheduler } return schedulerChangeRequests; } + + public ActivitiesManager getActivitiesManager() { + return this.activitiesManager; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java new file mode 100644 index 0000000..8fa1bb5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java @@ -0,0 +1,275 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; + +/** + * Utility for logging scheduler activities + */ +public class ActivitiesLogger { + private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class); + + /** + * Methods for recording activities from an app + */ + public static class APP { + + /* + * Record skipped application activity when no container allocated / + * reserved / re-reserved. Scheduler will look at following applications + * within the same leaf queue. + */ + public static void recordSkippedAppActivityWithoutAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Priority priority, + String diagnostic) { + recordAppActivityWithoutAllocation(activitiesManager, node, application, + priority, diagnostic, ActivityState.SKIPPED); + } + + /* + * Record application activity when rejected because of queue maximum + * capacity or user limit. + */ + public static void recordRejectedAppActivityFromLeafQueue( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Priority priority, + String diagnostic) { + String type = "app"; + recordActivity(activitiesManager, node, application.getQueueName(), + application.getApplicationId().toString(), priority, + ActivityState.REJECTED, diagnostic, type); + finishSkippedAppAllocationRecording(activitiesManager, + application.getApplicationId(), ActivityState.REJECTED, diagnostic); + } + + /* + * Record application activity when no container allocated / + * reserved / re-reserved. Scheduler will look at following applications + * within the same leaf queue. + */ + public static void recordAppActivityWithoutAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Priority priority, + String diagnostic, ActivityState appState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + String type = "container"; + // Add application-container activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getApplicationId().toString(), null, + priority.toString(), ActivityState.SKIPPED, diagnostic, type); + type = "app"; + // Add queue-application activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getQueueName(), + application.getApplicationId().toString(), + application.getPriority().toString(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY, type); + } + // Add application-container activity into specific application allocation + // Under this condition, it fails to allocate a container to this + // application, so containerId is null. + if (activitiesManager.shouldRecordThisApp( + application.getApplicationId())) { + String type = "container"; + activitiesManager.addSchedulingActivityForApp( + application.getApplicationId(), null, priority.toString(), appState, + diagnostic, type); + } + } + + /* + * Record application activity when container allocated / reserved / + * re-reserved + */ + public static void recordAppActivityWithAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + SchedulerApplicationAttempt application, Container updatedContainer, + ActivityState activityState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + String type = "container"; + // Add application-container activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getApplicationId().toString(), + updatedContainer.getId().toString(), + updatedContainer.getPriority().toString(), activityState, + ActivityDiagnosticConstant.EMPTY, type); + type = "app"; + // Add queue-application activity into specific node allocation. + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + application.getQueueName(), + application.getApplicationId().toString(), + application.getPriority().toString(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY, type); + } + // Add application-container activity into specific application allocation + if (activitiesManager.shouldRecordThisApp( + application.getApplicationId())) { + String type = "container"; + activitiesManager.addSchedulingActivityForApp( + application.getApplicationId(), updatedContainer.getId().toString(), + updatedContainer.getPriority().toString(), activityState, + ActivityDiagnosticConstant.EMPTY, type); + } + } + + /* + * Invoked when scheduler starts to look at this application within one node + * update. + */ + public static void startAppAllocationRecording( + ActivitiesManager activitiesManager, NodeId nodeId, long currentTime, + SchedulerApplicationAttempt application) { + if (activitiesManager == null) { + return; + } + activitiesManager.startAppAllocationRecording(nodeId, currentTime, + application); + } + + /* + * Invoked when scheduler finishes looking at this application within one + * node update, and the app has any container allocated/reserved during + * this allocation. + */ + public static void finishAllocatedAppAllocationRecording( + ActivitiesManager activitiesManager, ApplicationId applicationId, + ContainerId containerId, ActivityState containerState, + String diagnostic) { + if (activitiesManager == null) { + return; + } + + if (activitiesManager.shouldRecordThisApp(applicationId)) { + activitiesManager.finishAppAllocationRecording(applicationId, + containerId, containerState, diagnostic); + } + } + + /* + * Invoked when scheduler finishes looking at this application within one + * node update, and the app DOESN'T have any container allocated/reserved + * during this allocation. + */ + public static void finishSkippedAppAllocationRecording( + ActivitiesManager activitiesManager, ApplicationId applicationId, + ActivityState containerState, String diagnostic) { + finishAllocatedAppAllocationRecording(activitiesManager, applicationId, + null, containerState, diagnostic); + } + } + + /** + * Methods for recording activities from a queue + */ + public static class QUEUE { + /* + * Record activities of a queue + */ + public static void recordQueueActivity(ActivitiesManager activitiesManager, + SchedulerNode node, String parentQueueName, String queueName, + ActivityState state, String diagnostic) { + recordActivity(activitiesManager, node, parentQueueName, queueName, null, + state, diagnostic, null); + } + } + + /** + * Methods for recording overall activities from one node update + */ + public static class NODE { + + /* + * Invoked when node allocation finishes, and there's NO container + * allocated or reserved during the allocation + */ + public static void finishSkippedNodeAllocation( + ActivitiesManager activitiesManager, SchedulerNode node) { + finishAllocatedNodeAllocation(activitiesManager, node, null, + AllocationState.SKIPPED); + } + + /* + * Invoked when node allocation finishes, and there's any container + * allocated or reserved during the allocation + */ + public static void finishAllocatedNodeAllocation( + ActivitiesManager activitiesManager, SchedulerNode node, + ContainerId containerId, AllocationState containerState) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + activitiesManager.updateAllocationFinalState(node.getNodeID(), + containerId, containerState); + } + } + + /* + * Invoked when node heartbeat finishes + */ + public static void finishNodeUpdateRecording( + ActivitiesManager activitiesManager, NodeId nodeID) { + if (activitiesManager == null) { + return; + } + activitiesManager.finishNodeUpdateRecording(nodeID); + } + + /* + * Invoked when node heartbeat starts + */ + public static void startNodeUpdateRecording( + ActivitiesManager activitiesManager, NodeId nodeID) { + if (activitiesManager == null) { + return; + } + activitiesManager.startNodeUpdateRecording(nodeID); + } + } + + // Add queue, application or container activity into specific node allocation. + private static void recordActivity(ActivitiesManager activitiesManager, + SchedulerNode node, String parentName, String childName, + Priority priority, ActivityState state, String diagnostic, String type) { + if (activitiesManager == null) { + return; + } + if (activitiesManager.shouldRecordThisNode(node.getNodeID())) { + activitiesManager.addSchedulingActivityForNode(node.getNodeID(), + parentName, childName, priority != null ? priority.toString() : null, + state, diagnostic, type); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java new file mode 100644 index 0000000..bfab770 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; +import org.apache.hadoop.yarn.util.SystemClock; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.List; +import java.util.Set; +import java.util.*; +import java.util.ArrayList; + +/** + * A class to store node or application allocations. + * It mainly contains operations for allocation start, add, update and finish. + */ +public class ActivitiesManager extends AbstractService { + private static final Log LOG = LogFactory.getLog(ActivitiesManager.class); + private ConcurrentMap> recordingNodesAllocation; + private ConcurrentMap> completedNodeAllocations; + private Set activeRecordedNodes; + private ConcurrentMap + recordingAppActivitiesUntilSpecifiedTime; + private ConcurrentMap appsAllocation; + private ConcurrentMap> + completedAppAllocations; + private boolean recordNextAvailableNode = false; + private List lastAvailableNodeActivities = null; + private Thread cleanUpThread; + private int timeThreshold = 600 * 1000; + private final RMContext rmContext; + + public ActivitiesManager(RMContext rmContext) { + super(ActivitiesManager.class.getName()); + recordingNodesAllocation = new ConcurrentHashMap<>(); + completedNodeAllocations = new ConcurrentHashMap<>(); + appsAllocation = new ConcurrentHashMap<>(); + completedAppAllocations = new ConcurrentHashMap<>(); + activeRecordedNodes = Collections.newSetFromMap( + new ConcurrentHashMap()); + recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>(); + this.rmContext = rmContext; + } + + public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) { + if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus() + == FinalApplicationStatus.UNDEFINED) { + List allocations = completedAppAllocations.get( + applicationId); + + return new AppActivitiesInfo(allocations, applicationId); + } else { + return new AppActivitiesInfo( + "fail to get application activities after finished", + applicationId.toString()); + } + } + + public ActivitiesInfo getActivitiesInfo(String nodeId) { + List allocations; + if (nodeId == null) { + allocations = lastAvailableNodeActivities; + } else { + allocations = completedNodeAllocations.get(NodeId.fromString(nodeId)); + } + return new ActivitiesInfo(allocations, nodeId); + } + + public void recordNextNodeUpdateActivities(String nodeId) { + if (nodeId == null) { + recordNextAvailableNode = true; + } else { + activeRecordedNodes.add(NodeId.fromString(nodeId)); + } + } + + public void turnOnAppActivitiesRecording(ApplicationId applicationId, + double maxTime) { + long startTS = SystemClock.getInstance().getTime(); + long endTS = startTS + (long) (maxTime * 1000); + recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS); + } + + @Override + protected void serviceStart() throws Exception { + cleanUpThread = new Thread(new Runnable() { + @Override + public void run() { + while (true) { + Iterator>> ite = + completedNodeAllocations.entrySet().iterator(); + while (ite.hasNext()) { + Map.Entry> nodeAllocation = ite.next(); + List allocations = nodeAllocation.getValue(); + long currTS = SystemClock.getInstance().getTime(); + if (allocations.size() > 0 && allocations.get(0).getTimeStamp() + - currTS > timeThreshold) { + ite.remove(); + } + } + + Iterator>> iteApp = + completedAppAllocations.entrySet().iterator(); + while (iteApp.hasNext()) { + Map.Entry> appAllocation = + iteApp.next(); + if (rmContext.getRMApps().get(appAllocation.getKey()) + .getFinalApplicationStatus() + != FinalApplicationStatus.UNDEFINED) { + iteApp.remove(); + } + } + + try { + Thread.sleep(5000); + } catch (Exception e) { + // ignore + } + } + } + }); + + cleanUpThread.start(); + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + cleanUpThread.interrupt(); + super.serviceStop(); + } + + void startNodeUpdateRecording(NodeId nodeID) { + if (recordNextAvailableNode) { + recordNextNodeUpdateActivities(nodeID.toString()); + } + if (activeRecordedNodes.contains(nodeID)) { + List nodeAllocation = new ArrayList<>(); + recordingNodesAllocation.put(nodeID, nodeAllocation); + } + } + + void startAppAllocationRecording(NodeId nodeID, long currTS, + SchedulerApplicationAttempt application) { + ApplicationId applicationId = application.getApplicationId(); + + if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + > currTS) { + appsAllocation.put(applicationId, + new AppAllocation(application.getPriority(), nodeID, + application.getQueueName())); + } + + if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + <= currTS) { + turnOffActivityMonitoringForApp(applicationId); + } + } + + // Add queue, application or container activity into specific node allocation. + void addSchedulingActivityForNode(NodeId nodeID, String parentName, + String childName, String priority, ActivityState state, String diagnostic, + String type) { + if (shouldRecordThisNode(nodeID)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + nodeAllocation.addAllocationActivity(parentName, childName, priority, + state, diagnostic, type); + } + } + + // Add queue, application or container activity into specific application + // allocation. + void addSchedulingActivityForApp(ApplicationId applicationId, + String containerId, String priority, ActivityState state, + String diagnostic, String type) { + if (shouldRecordThisApp(applicationId)) { + AppAllocation appAllocation = appsAllocation.get(applicationId); + appAllocation.addAppAllocationActivity(containerId, priority, state, + diagnostic, type); + } + } + + // Update container allocation meta status for this node allocation. + // It updates general container status but not the detailed activity state + // in updateActivityState. + void updateAllocationFinalState(NodeId nodeID, ContainerId containerId, + AllocationState containerState) { + if (shouldRecordThisNode(nodeID)) { + NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID); + nodeAllocation.updateContainerState(containerId, containerState); + } + } + + void finishAppAllocationRecording(ApplicationId applicationId, + ContainerId containerId, ActivityState appState, String diagnostic) { + if (shouldRecordThisApp(applicationId)) { + long currTS = SystemClock.getInstance().getTime(); + AppAllocation appAllocation = appsAllocation.remove(applicationId); + appAllocation.updateAppContainerStateAndTime(containerId, appState, + currTS, diagnostic); + + List appAllocations; + if (completedAppAllocations.containsKey(applicationId)) { + appAllocations = completedAppAllocations.get(applicationId); + } else { + appAllocations = new ArrayList<>(); + completedAppAllocations.put(applicationId, appAllocations); + } + if (appAllocations.size() == 1000) { + appAllocations.remove(0); + } + appAllocations.add(appAllocation); + + if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId) + <= currTS) { + turnOffActivityMonitoringForApp(applicationId); + } + } + } + + void finishNodeUpdateRecording(NodeId nodeID) { + List value = recordingNodesAllocation.get(nodeID); + long timeStamp = SystemClock.getInstance().getTime(); + + if (value != null) { + if (value.size() > 0) { + lastAvailableNodeActivities = value; + for (NodeAllocation allocation : lastAvailableNodeActivities) { + allocation.transformToTree(); + allocation.setTimeStamp(timeStamp); + } + if (recordNextAvailableNode) { + recordNextAvailableNode = false; + } + } + + if (shouldRecordThisNode(nodeID)) { + recordingNodesAllocation.remove(nodeID); + completedNodeAllocations.put(nodeID, value); + stopRecordNodeUpdateActivities(nodeID); + } + } + } + + boolean shouldRecordThisApp(ApplicationId applicationId) { + return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId) + && appsAllocation.containsKey(applicationId); + } + + boolean shouldRecordThisNode(NodeId nodeID) { + return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation + .containsKey(nodeID); + } + + private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) { + List nodeAllocations = recordingNodesAllocation.get(nodeID); + NodeAllocation nodeAllocation; + // When this node has already stored allocation activities, get the + // last allocation for this node. + if (nodeAllocations.size() != 0) { + nodeAllocation = nodeAllocations.get(nodeAllocations.size() - 1); + // When final state in last allocation is not DEFAULT, it means + // last allocation has finished. Create a new allocation for this node, + // and add it to the allocation list. Return this new allocation. + // + // When final state in last allocation is DEFAULT, + // it means last allocation has not finished. Just get last allocation. + if (nodeAllocation.getFinalAllocationState() != AllocationState.DEFAULT) { + nodeAllocation = new NodeAllocation(nodeID); + nodeAllocations.add(nodeAllocation); + } + } + // When this node has not stored allocation activities, + // create a new allocation for this node, and add it to the allocation list. + // Return this new allocation. + else { + nodeAllocation = new NodeAllocation(nodeID); + nodeAllocations.add(nodeAllocation); + } + return nodeAllocation; + } + + private void stopRecordNodeUpdateActivities(NodeId nodeId) { + activeRecordedNodes.remove(nodeId); + } + + private void turnOffActivityMonitoringForApp(ApplicationId applicationId) { + recordingAppActivitiesUntilSpecifiedTime.remove(applicationId); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java new file mode 100644 index 0000000..fc4738e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityDiagnosticConstant.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +/* + * Collection of diagnostics. + */ +public class ActivityDiagnosticConstant { + // EMPTY means it does not have any diagnostic to display. + // In order not to show "diagnostic" line in frontend, + // we set the value to null. + public final static String EMPTY = null; + public final static String NOT_ABLE_TO_ACCESS_PARTITION = + "Not able to access partition"; + public final static String QUEUE_DO_NOT_NEED_MORE_RESOURCE = + "Queue does not need more resource"; + public final static String QUEUE_MAX_CAPACITY_LIMIT = + "Hit queue max-capacity limit"; + public final static String USER_CAPACITY_MAXIMUM_LIMIT = + "Hit user capacity maximum limit"; + public final static String SKIP_BLACK_LISTED_NODE = "Skip black listed node"; + public final static String PRIORITY_SKIPPED = "Priority skipped"; + public final static String PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST = + "Priority skipped because off-switch request is null"; + public final static String + PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST = + "Priority skipped because partition of node doesn't match request"; + public final static String SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY = + "Priority skipped because of relax locality is not allowed"; + public final static String SKIP_IN_IGNORE_EXCLUSIVITY_MODE = + "Skipping assigning to Node in Ignore Exclusivity mode"; + public final static String DO_NOT_NEED_ALLOCATIONATTEMPTINFOS = + "Doesn't need containers based on reservation algo!"; + public final static String QUEUE_SKIPPED_HEADROOM = + "Queue skipped because of headroom"; + public final static String NON_PARTITIONED_PARTITION_FIRST = + "Non-partitioned resource request should be scheduled to " + + "non-partitioned partition first"; + public final static String SKIP_NODE_LOCAL_REQUEST = + "Skip node-local request"; + public final static String SKIP_RACK_LOCAL_REQUEST = + "Skip rack-local request"; + public final static String SKIP_OFF_SWITCH_REQUEST = + "Skip offswitch request"; + public final static String REQUEST_CAN_NOT_ACCESS_NODE_LABEL = + "Resource request can not access the label"; + public final static String NOT_SUFFICIENT_RESOURCE = + "Node does not have sufficient resource for request"; + public final static String LOCALITY_SKIPPED = "Locality skipped"; + public final static String FAIL_TO_ALLOCATE = "Fail to allocate"; + public final static String COULD_NOT_GET_CONTAINER = + "Couldn't get container for allocation"; + public final static String APPLICATION_DO_NOT_NEED_RESOURCE = + "Application does not need more resource"; + public final static String APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE = + "Application priority does not need more resource"; + public final static String SKIPPED_ALL_PRIORITIES = + "All priorities are skipped of the app"; + public final static String RESPECT_FIFO = "To respect FIFO of applications, " + + "skipped following applications in the queue"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java new file mode 100644 index 0000000..a03814c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityNode.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import java.util.LinkedList; +import java.util.List; + +/* + * It represents tree node in "NodeAllocation" tree structure. + * Each node may represent queue, application or container in allocation activity. + * Node may have children node if successfully allocated to next level. + */ +public class ActivityNode { + private String activityNodeName; + private String parentName; + private String appPriority; + private String requestPriority; + private ActivityState state; + private String diagnostic; + + private List childNode; + + public ActivityNode(String activityNodeName, String parentName, + String priority, ActivityState state, String diagnostic, String type) { + this.activityNodeName = activityNodeName; + this.parentName = parentName; + if (type != null) { + if (type.equals("app")) { + this.appPriority = priority; + } else if (type.equals("container")) { + this.requestPriority = priority; + } + } + this.state = state; + this.diagnostic = diagnostic; + this.childNode = new LinkedList<>(); + } + + public String getName() { + return this.activityNodeName; + } + + public String getParentName() { + return this.parentName; + } + + public void addChild(ActivityNode node) { + childNode.add(0, node); + } + + public List getChildren() { + return this.childNode; + } + + public ActivityState getState() { + return this.state; + } + + public String getDiagnostic() { + return this.diagnostic; + } + + public String getAppPriority() { + return appPriority; + } + + public String getRequestPriority() { + return requestPriority; + } + + public boolean getType() { + if (appPriority != null) { + return true; + } else { + return false; + } + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(this.activityNodeName + " "); + sb.append(this.appPriority + " "); + sb.append(this.state + " "); + if (!this.diagnostic.equals("")) { + sb.append(this.diagnostic + "\n"); + } + sb.append("\n"); + for (ActivityNode child : childNode) { + sb.append(child.toString() + "\n"); + } + return sb.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java new file mode 100644 index 0000000..bce1fc9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivityState.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +/* + * Collection of activity operation states. + */ +public enum ActivityState { + // default state when adding a new activity in node allocation + DEFAULT, + // container is allocated to sub-queues/applications or this queue/application + ACCEPTED, + // queue or application voluntarily give up to use the resource OR + // nothing allocated + SKIPPED, + // container could not be allocated to sub-queues or this application + REJECTED, + ALLOCATED, // successfully allocate a new non-reserved container + RESERVED, // successfully reserve a new container + RE_RESERVED // successfully reserve a new container +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java new file mode 100644 index 0000000..da768e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationActivity.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/* + * It records an activity operation in allocation, + * which can be classified as queue, application or container activity. + * Other information include state, diagnostic, priority. + */ +public class AllocationActivity { + private String childName = null; + private String parentName = null; + private String appPriority = null; + private String requestPriority = null; + private ActivityState state; + private String diagnostic = null; + + private static final Log LOG = LogFactory.getLog(AllocationActivity.class); + + public AllocationActivity(String parentName, String queueName, + String priority, ActivityState state, String diagnostic, String type) { + this.childName = queueName; + this.parentName = parentName; + if (type != null) { + if (type.equals("app")) { + this.appPriority = priority; + } else if (type.equals("container")) { + this.requestPriority = priority; + } + } + this.state = state; + this.diagnostic = diagnostic; + } + + public ActivityNode createTreeNode() { + if (appPriority != null) { + return new ActivityNode(this.childName, this.parentName, this.appPriority, + this.state, this.diagnostic, "app"); + } else if (requestPriority != null) { + return new ActivityNode(this.childName, this.parentName, + this.requestPriority, this.state, this.diagnostic, "container"); + } else { + return new ActivityNode(this.childName, this.parentName, null, this.state, + this.diagnostic, null); + } + } + + public String getName() { + return this.childName; + } + + public String getState() { + return this.state.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java new file mode 100644 index 0000000..e38cefc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AllocationState.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +/* + * Collection of allocation final states. + */ +public enum AllocationState { + DEFAULT, + // queue or application voluntarily give up to use the resource + // OR nothing allocated + SKIPPED, + // successfully allocate a new non-reserved container + ALLOCATED, + // successfully allocate a new container from an existing reserved container + ALLOCATED_FROM_RESERVED, + // successfully reserve a new container + RESERVED +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java new file mode 100644 index 0000000..15850c0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; + +import java.util.ArrayList; +import java.util.List; + +/* + * It contains allocation information for one application within a period of + * time. + * Each application allocation may have several allocation attempts. + */ +public class AppAllocation { + private Priority priority = null; + private NodeId nodeId; + private ContainerId containerId = null; + private ActivityState appState = null; + private String diagnostic = null; + private String queueName = null; + private List allocationAttempts; + private long timestamp; + + public AppAllocation(Priority priority, NodeId nodeId, String queueName) { + this.priority = priority; + this.nodeId = nodeId; + this.allocationAttempts = new ArrayList<>(); + this.queueName = queueName; + } + + public void updateAppContainerStateAndTime(ContainerId containerId, + ActivityState appState, long ts, String diagnostic) { + this.timestamp = ts; + this.containerId = containerId; + this.appState = appState; + this.diagnostic = diagnostic; + } + + public void addAppAllocationActivity(String containerId, String priority, + ActivityState state, String diagnostic, String type) { + ActivityNode container = new ActivityNode(containerId, null, priority, + state, diagnostic, type); + this.allocationAttempts.add(container); + if (state == ActivityState.REJECTED) { + this.appState = ActivityState.SKIPPED; + } else { + this.appState = state; + } + } + + public String getNodeId() { + return nodeId.toString(); + } + + public String getQueueName() { + return queueName; + } + + public ActivityState getAppState() { + return appState; + } + + public String getPriority() { + if (priority == null) { + return null; + } + return priority.toString(); + } + + public String getContainerId() { + if (containerId == null) { + return null; + } + return containerId.toString(); + } + + public String getDiagnostic() { + return diagnostic; + } + + public long getTime() { + return this.timestamp; + } + + public List getAllocationAttempts() { + return allocationAttempts; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java new file mode 100644 index 0000000..5a8f955 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/NodeAllocation.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/* + * It contains allocation information for one allocation in a node heartbeat. + * Detailed allocation activities are first stored in "AllocationActivity" + * as operations, then transformed to a tree structure. + * Tree structure starts from root queue and ends in leaf queue, + * application or container allocation. + */ +public class NodeAllocation { + private NodeId nodeId; + private long timeStamp; + private ContainerId containerId = null; + private AllocationState containerState = AllocationState.DEFAULT; + private List allocationOperations; + + private ActivityNode root = null; + + private static final Log LOG = LogFactory.getLog(NodeAllocation.class); + + public NodeAllocation(NodeId nodeId) { + this.nodeId = nodeId; + this.allocationOperations = new ArrayList<>(); + } + + public void addAllocationActivity(String parentName, String childName, + String priority, ActivityState state, String diagnostic, String type) { + AllocationActivity allocate = new AllocationActivity(parentName, childName, + priority, state, diagnostic, type); + this.allocationOperations.add(allocate); + } + + public void updateContainerState(ContainerId containerId, + AllocationState containerState) { + this.containerId = containerId; + this.containerState = containerState; + } + + // In node allocation, transform each activity to a tree-like structure + // for frontend activity display. + // eg: root + // / \ + // a b + // / \ + // app1 app2 + // / \ + // CA1 CA2 + // CA means Container Attempt + public void transformToTree() { + List allocationTree = new ArrayList<>(); + + if (root == null) { + Set names = Collections.newSetFromMap( + new ConcurrentHashMap()); + ListIterator ite = allocationOperations.listIterator( + allocationOperations.size()); + while (ite.hasPrevious()) { + String name = ite.previous().getName(); + if (name != null) { + if (!names.contains(name)) { + names.add(name); + } else { + ite.remove(); + } + } + } + + for (AllocationActivity allocationOperation : allocationOperations) { + ActivityNode node = allocationOperation.createTreeNode(); + String name = node.getName(); + for (int i = allocationTree.size() - 1; i > -1; i--) { + if (allocationTree.get(i).getParentName().equals(name)) { + node.addChild(allocationTree.get(i)); + allocationTree.remove(i); + } else { + break; + } + } + allocationTree.add(node); + } + root = allocationTree.get(0); + } + } + + public void setTimeStamp(long timeStamp) { + this.timeStamp = timeStamp; + } + + public long getTimeStamp() { + return this.timeStamp; + } + + public AllocationState getFinalAllocationState() { + return containerState; + } + + public String getContainerId() { + if (containerId == null) + return null; + return containerId.toString(); + } + + public ActivityNode getRoot() { + return root; + } + + public String getNodeId() { + return nodeId.toString(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 9c88154..1d8f929 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -91,12 +92,15 @@ public abstract class AbstractCSQueue implements CSQueue { protected CapacitySchedulerContext csContext; protected YarnAuthorizationProvider authorizer = null; - public AbstractCSQueue(CapacitySchedulerContext cs, + protected ActivitiesManager activitiesManager; + + public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this.labelManager = cs.getRMContext().getNodeLabelManager(); this.parent = parent; this.queueName = queueName; this.resourceCalculator = cs.getResourceCalculator(); + this.activitiesManager = cs.getActivitiesManager(); // must be called after parent and queueName is set this.metrics = http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 54dcfdf..5696c71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -109,6 +109,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicE import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation; @@ -308,6 +314,8 @@ public class CapacityScheduler extends this.applications = new ConcurrentHashMap<>(); this.labelManager = rmContext.getNodeLabelManager(); authorizer = YarnAuthorizationProvider.getInstance(yarnConf); + this.activitiesManager = new ActivitiesManager(rmContext); + activitiesManager.init(conf); initializeQueues(this.conf); this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled(); @@ -345,6 +353,7 @@ public class CapacityScheduler extends @Override public void serviceStart() throws Exception { startSchedulerThreads(); + activitiesManager.start(); super.serviceStart(); } @@ -524,7 +533,7 @@ public class CapacityScheduler extends Map newQueues = new HashMap(); CSQueue newRoot = parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, - newQueues, queues, noop); + newQueues, queues, noop); // Ensure all existing queues are still present validateExistingQueues(queues, newQueues); @@ -651,7 +660,7 @@ public class CapacityScheduler extends throw new IllegalStateException( "Only Leaf Queues can be reservable for " + queueName); } - ParentQueue parentQueue = + ParentQueue parentQueue = new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName)); // Used only for unit tests @@ -803,7 +812,7 @@ public class CapacityScheduler extends FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, application.getUser(), queue, queue.getActiveUsersManager(), rmContext, - application.getPriority(), isAttemptRecovering); + application.getPriority(), isAttemptRecovering, activitiesManager); if (transferStateFromPreviousAttempt) { attempt.transferStateFromPreviousAttempt( application.getCurrentAppAttempt()); @@ -1234,6 +1243,7 @@ public class CapacityScheduler extends RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { + FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(reservedContainer.getContainerId()); @@ -1263,6 +1273,19 @@ public class CapacityScheduler extends tmp.getAssignmentInformation().incrAllocations(); updateSchedulerHealth(lastNodeUpdateTime, node, tmp); schedulerHealth.updateSchedulerFulfilledReservationCounts(1); + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), + AllocationState.ALLOCATED_FROM_RESERVED); + } else { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + queue.getParent().getQueueName(), queue.getQueueName(), + ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY); + ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager, + node, reservedContainer.getContainerId(), AllocationState.SKIPPED); } } @@ -1372,7 +1395,11 @@ public class CapacityScheduler extends setLastNodeUpdateTime(Time.now()); nodeUpdate(node); if (!scheduleAsynchronously) { + ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager, + node.getNodeID()); allocateContainersToNode(getNode(node.getNodeID())); + ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager, + node.getNodeID()); } } break; @@ -1946,13 +1973,9 @@ public class CapacityScheduler extends return targetQueueName; } - /** + /* * Check application can be moved to queue with labels enabled. All labels in * application life time will be checked - * - * @param appId - * @param dest - * @throws YarnException */ private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest) throws YarnException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index b39b289..c41a7bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; @@ -80,4 +81,6 @@ public interface CapacitySchedulerContext { * cluster. */ ResourceUsage getClusterResourceUsage(); + + ActivitiesManager getActivitiesManager(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 3594bb0..109df3c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -58,6 +58,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -65,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderi import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock.NoLock; +import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -142,7 +146,7 @@ public class LeafQueue extends AbstractCSQueue { super(cs, queueName, parent, old); this.scheduler = cs; - this.activeUsersManager = new ActiveUsersManager(metrics); + this.activeUsersManager = new ActiveUsersManager(metrics); // One time initialization is enough since it is static ordering policy this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps(); @@ -153,7 +157,7 @@ public class LeafQueue extends AbstractCSQueue { LOG.debug("LeafQueue:" + " name=" + queueName + ", fullname=" + getQueuePath()); } - + setupQueueConfigs(cs.getClusterResource()); } @@ -872,7 +876,7 @@ public class LeafQueue extends AbstractCSQueue { float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition); limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity); } - + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, @@ -891,6 +895,10 @@ public class LeafQueue extends AbstractCSQueue { if (reservedContainer != null) { FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); + + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); + synchronized (application) { CSAssignment assignment = application.assignContainers(clusterResource, node, @@ -905,6 +913,10 @@ public class LeafQueue extends AbstractCSQueue { // if our queue cannot access this node, just return if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY && !accessibleToPartition(node.getPartition())) { + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); return CSAssignment.NULL_ASSIGNMENT; } @@ -917,6 +929,9 @@ public class LeafQueue extends AbstractCSQueue { + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); return CSAssignment.NULL_ASSIGNMENT; } @@ -924,13 +939,23 @@ public class LeafQueue extends AbstractCSQueue { orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) { FiCaSchedulerApp application = assignmentIterator.next(); + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, + node.getNodeID(), SystemClock.getInstance().getTime(), application); + // Check queue max-capacity limit if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), currentResourceLimits, application.getCurrentReservation(), schedulingMode)) { + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, + application, application.getPriority(), + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } - + Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, node.getPartition(), schedulingMode); @@ -940,6 +965,10 @@ public class LeafQueue extends AbstractCSQueue { application, node.getPartition(), currentResourceLimits)) { application.updateAMContainerDiagnostics(AMState.ACTIVATED, "User capacity has reached its maximum limit."); + ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue( + activitiesManager, node, + application, application.getPriority(), + ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT); continue; } @@ -981,10 +1010,17 @@ public class LeafQueue extends AbstractCSQueue { incReservedResource(node.getPartition(), reservedRes); } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + // Done return assignment; } else if (assignment.getSkippedType() == CSAssignment.SkippedType.OTHER) { + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); application.updateNodeInfoForAMDiagnostics(node); } else if(assignment.getSkippedType() == CSAssignment.SkippedType.QUEUE_LIMIT) { @@ -992,9 +1028,18 @@ public class LeafQueue extends AbstractCSQueue { } else { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.RESPECT_FIFO); + ActivitiesLogger.APP.finishSkippedAppAllocationRecording( + activitiesManager, application.getApplicationId(), + ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } } + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); return CSAssignment.NULL_ASSIGNMENT; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ee101e49/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 9ae35ee..a245e3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -42,12 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.resource.Resources; @@ -81,7 +80,7 @@ public class ParentQueue extends AbstractCSQueue { private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - public ParentQueue(CapacitySchedulerContext cs, + public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.scheduler = cs; @@ -98,14 +97,14 @@ public class ParentQueue extends AbstractCSQueue { "capacity of " + rawCapacity + " for queue " + queueName + ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - + this.childQueues = new TreeSet(nonPartitionedQueueComparator); - + setupQueueConfigs(cs.getClusterResource()); - LOG.info("Initialized parent-queue " + queueName + - " name=" + queueName + - ", fullname=" + getQueuePath()); + LOG.info("Initialized parent-queue " + queueName + + " name=" + queueName + + ", fullname=" + getQueuePath()); } synchronized void setupQueueConfigs(Resource clusterResource) @@ -380,6 +379,10 @@ public class ParentQueue extends AbstractCSQueue { " #applications: " + getNumApplications()); } + private String getParentName() { + return getParent() != null ? getParent().getQueueName() : ""; + } + @Override public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits resourceLimits, @@ -392,6 +395,16 @@ public class ParentQueue extends AbstractCSQueue { + ", because it is not able to access partition=" + node .getPartition()); } + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.REJECTED, + ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node + .getPartition()); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + return CSAssignment.NULL_ASSIGNMENT; } @@ -404,6 +417,15 @@ public class ParentQueue extends AbstractCSQueue { + ", because it doesn't need more resource, schedulingMode=" + schedulingMode.name() + " node-partition=" + node.getPartition()); } + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + return CSAssignment.NULL_ASSIGNMENT; } @@ -423,9 +445,18 @@ public class ParentQueue extends AbstractCSQueue { resourceLimits, Resources.createResource( getMetrics().getReservedMB(), getMetrics() .getReservedVirtualCores()), schedulingMode)) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + break; } - + // Schedule CSAssignment assignedToChild = assignContainersToChildQueues(clusterResource, node, resourceLimits, @@ -436,6 +467,29 @@ public class ParentQueue extends AbstractCSQueue { if (Resources.greaterThan( resourceCalculator, clusterResource, assignedToChild.getResource(), Resources.none())) { + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.ACCEPTED, + ActivityDiagnosticConstant.EMPTY); + + if (node.getReservedContainer() == null) { + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.ALLOCATED); + } + } else { + if (rootQueue) { + ActivitiesLogger.NODE.finishAllocatedNodeAllocation( + activitiesManager, node, + assignedToChild.getAssignmentInformation() + .getFirstAllocatedOrReservedContainerId(), + AllocationState.RESERVED); + } + } + // Track resource utilization for the parent-queue allocateResource(clusterResource, assignedToChild.getResource(), node.getPartition(), assignedToChild.isIncreasedAllocation()); @@ -474,6 +528,15 @@ public class ParentQueue extends AbstractCSQueue { } else { assignment.setSkippedType(assignedToChild.getSkippedType()); + + ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, + getParentName(), getQueueName(), ActivityState.SKIPPED, + ActivityDiagnosticConstant.EMPTY); + if (rootQueue) { + ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager, + node); + } + break; } @@ -631,7 +694,7 @@ public class ParentQueue extends AbstractCSQueue { resourceToSubtract); } } - + return assignment; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org