Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 433BD11C0C for ; Tue, 19 Aug 2014 23:53:52 +0000 (UTC) Received: (qmail 60374 invoked by uid 500); 19 Aug 2014 23:53:52 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 60325 invoked by uid 500); 19 Aug 2014 23:53:52 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 60313 invoked by uid 99); 19 Aug 2014 23:53:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Aug 2014 23:53:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Aug 2014 23:53:47 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C0C092388CFC; Tue, 19 Aug 2014 23:51:19 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1619012 [17/26] - in /hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-api/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api... Date: Tue, 19 Aug 2014 23:51:01 -0000 To: yarn-commits@hadoop.apache.org From: cmccabe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140819235119.C0C092388CFC@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java Tue Aug 19 23:49:39 2014 @@ -18,17 +18,24 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.Groups; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.w3c.dom.Element; import org.w3c.dom.NamedNodeMap; import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import com.google.common.annotations.VisibleForTesting; + +@Private +@Unstable public abstract class QueuePlacementRule { protected boolean create; @@ -58,16 +65,20 @@ public abstract class QueuePlacementRule * continue to the next rule, and null indicates that the app should be rejected. */ public String assignAppToQueue(String requestedQueue, String user, - Groups groups, Collection configuredQueues) throws IOException { - String queue = getQueueForApp(requestedQueue, user, groups, configuredQueues); - if (create || configuredQueues.contains(queue)) { + Groups groups, Map> configuredQueues) + throws IOException { + String queue = getQueueForApp(requestedQueue, user, groups, + configuredQueues); + if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue) + || configuredQueues.get(FSQueueType.PARENT).contains(queue)) { return queue; } else { return ""; } } - public void initializeFromXml(Element el) { + public void initializeFromXml(Element el) + throws AllocationConfigurationException { boolean create = true; NamedNodeMap attributes = el.getAttributes(); Map args = new HashMap(); @@ -104,15 +115,16 @@ public abstract class QueuePlacementRule * continue to the next rule. */ protected abstract String getQueueForApp(String requestedQueue, String user, - Groups groups, Collection configuredQueues) throws IOException; + Groups groups, Map> configuredQueues) + throws IOException; /** * Places apps in queues by username of the submitter */ public static class User extends QueuePlacementRule { @Override - protected String getQueueForApp(String requestedQueue, - String user, Groups groups, Collection configuredQueues) { + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) { return "root." + user; } @@ -127,9 +139,9 @@ public abstract class QueuePlacementRule */ public static class PrimaryGroup extends QueuePlacementRule { @Override - protected String getQueueForApp(String requestedQueue, - String user, Groups groups, - Collection configuredQueues) throws IOException { + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) + throws IOException { return "root." + groups.getGroups(user).get(0); } @@ -147,12 +159,15 @@ public abstract class QueuePlacementRule */ public static class SecondaryGroupExistingQueue extends QueuePlacementRule { @Override - protected String getQueueForApp(String requestedQueue, - String user, Groups groups, - Collection configuredQueues) throws IOException { + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) + throws IOException { List groupNames = groups.getGroups(user); for (int i = 1; i < groupNames.size(); i++) { - if (configuredQueues.contains("root." + groupNames.get(i))) { + String group = groupNames.get(i); + if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group) + || configuredQueues.get(FSQueueType.PARENT).contains( + "root." + group)) { return "root." + groupNames.get(i); } } @@ -167,12 +182,83 @@ public abstract class QueuePlacementRule } /** + * Places apps in queues with name of the submitter under the queue + * returned by the nested rule. + */ + public static class NestedUserQueue extends QueuePlacementRule { + @VisibleForTesting + QueuePlacementRule nestedRule; + + /** + * Parse xml and instantiate the nested rule + */ + @Override + public void initializeFromXml(Element el) + throws AllocationConfigurationException { + NodeList elements = el.getChildNodes(); + + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + Element element = (Element) node; + if ("rule".equals(element.getTagName())) { + QueuePlacementRule rule = QueuePlacementPolicy + .createAndInitializeRule(node); + if (rule == null) { + throw new AllocationConfigurationException( + "Unable to create nested rule in nestedUserQueue rule"); + } + this.nestedRule = rule; + break; + } else { + continue; + } + } + } + + if (this.nestedRule == null) { + throw new AllocationConfigurationException( + "No nested rule specified in rule"); + } + super.initializeFromXml(el); + } + + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) + throws IOException { + // Apply the nested rule + String queueName = nestedRule.assignAppToQueue(requestedQueue, user, + groups, configuredQueues); + + if (queueName != null && queueName.length() != 0) { + if (!queueName.startsWith("root.")) { + queueName = "root." + queueName; + } + + // Verify if the queue returned by the nested rule is an configured leaf queue, + // if yes then skip to next rule in the queue placement policy + if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) { + return ""; + } + return queueName + "." + user; + } + return queueName; + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** * Places apps in queues by requested queue of the submitter */ public static class Specified extends QueuePlacementRule { @Override - protected String getQueueForApp(String requestedQueue, - String user, Groups groups, Collection configuredQueues) { + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) { if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) { return ""; } else { @@ -190,34 +276,61 @@ public abstract class QueuePlacementRule } /** - * Places all apps in the default queue + * Places apps in the specified default queue. If no default queue is + * specified the app is placed in root.default queue. */ public static class Default extends QueuePlacementRule { + @VisibleForTesting + String defaultQueueName; + @Override - protected String getQueueForApp(String requestedQueue, String user, - Groups groups, Collection configuredQueues) { - return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME; + public QueuePlacementRule initialize(boolean create, + Map args) { + if (defaultQueueName == null) { + defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME; + } + return super.initialize(create, args); } @Override + public void initializeFromXml(Element el) + throws AllocationConfigurationException { + defaultQueueName = el.getAttribute("queue"); + if (defaultQueueName != null && !defaultQueueName.isEmpty()) { + if (!defaultQueueName.startsWith("root.")) { + defaultQueueName = "root." + defaultQueueName; + } + } else { + defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME; + } + super.initializeFromXml(el); + } + + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) { + return defaultQueueName; + } + + @Override public boolean isTerminal() { return true; } } - + /** * Rejects all apps */ public static class Reject extends QueuePlacementRule { @Override public String assignAppToQueue(String requestedQueue, String user, - Groups groups, Collection configuredQueues) { + Groups groups, Map> configuredQueues) { return null; } @Override protected String getQueueForApp(String requestedQueue, String user, - Groups groups, Collection configuredQueues) { + Groups groups, Map> configuredQueues) { throw new UnsupportedOperationException(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Tue Aug 19 23:49:39 2014 @@ -23,23 +23,18 @@ import org.apache.hadoop.classification. import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; /** - * A Schedulable represents an entity that can launch tasks, such as a job - * or a queue. It provides a common interface so that algorithms such as fair - * sharing can be applied both within a queue and across queues. There are - * currently two types of Schedulables: JobSchedulables, which represent a - * single job, and QueueSchedulables, which allocate among jobs in their queue. - * - * Separate sets of Schedulables are used for maps and reduces. Each queue has - * both a mapSchedulable and a reduceSchedulable, and so does each job. + * A Schedulable represents an entity that can be scheduled such as an + * application or a queue. It provides a common interface so that algorithms + * such as fair sharing can be applied both within a queue and across queues. * * A Schedulable is responsible for three roles: - * 1) It can launch tasks through assignTask(). - * 2) It provides information about the job/queue to the scheduler, including: + * 1) Assign resources through {@link #assignContainer}. + * 2) It provides information about the app/queue to the scheduler, including: * - Demand (maximum number of tasks required) - * - Number of currently running tasks * - Minimum share (for queues) * - Job/queue weight (for fair sharing) * - Start time and priority (for FIFO) @@ -56,64 +51,61 @@ import org.apache.hadoop.yarn.util.resou */ @Private @Unstable -public abstract class Schedulable { - /** Fair share assigned to this Schedulable */ - private Resource fairShare = Resources.createResource(0); - +public interface Schedulable { /** * Name of job/queue, used for debugging as well as for breaking ties in * scheduling order deterministically. */ - public abstract String getName(); + public String getName(); /** * Maximum number of resources required by this Schedulable. This is defined as * number of currently utilized resources + number of unlaunched resources (that * are either not yet launched or need to be speculated). */ - public abstract Resource getDemand(); + public Resource getDemand(); /** Get the aggregate amount of resources consumed by the schedulable. */ - public abstract Resource getResourceUsage(); + public Resource getResourceUsage(); /** Minimum Resource share assigned to the schedulable. */ - public abstract Resource getMinShare(); + public Resource getMinShare(); /** Maximum Resource share assigned to the schedulable. */ - public abstract Resource getMaxShare(); + public Resource getMaxShare(); /** Job/queue weight in fair sharing. */ - public abstract ResourceWeights getWeights(); + public ResourceWeights getWeights(); /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/ - public abstract long getStartTime(); + public long getStartTime(); /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */ - public abstract Priority getPriority(); + public Priority getPriority(); /** Refresh the Schedulable's demand and those of its children if any. */ - public abstract void updateDemand(); + public void updateDemand(); /** * Assign a container on this node if possible, and return the amount of * resources assigned. */ - public abstract Resource assignContainer(FSSchedulerNode node); + public Resource assignContainer(FSSchedulerNode node); - /** Assign a fair share to this Schedulable. */ - public void setFairShare(Resource fairShare) { - this.fairShare = fairShare; - } + /** + * Preempt a container from this Schedulable if possible. + */ + public RMContainer preemptContainer(); /** Get the fair share assigned to this Schedulable. */ - public Resource getFairShare() { - return fairShare; - } - - /** Convenient toString implementation for debugging. */ - @Override - public String toString() { - return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]", - getName(), getDemand(), getResourceUsage(), fairShare, getWeights()); - } + public Resource getFairShare(); + + /** Assign a fair share to this Schedulable. */ + public void setFairShare(Resource fairShare); + + /** + * Returns true if queue has atleast one app running. Always returns true for + * AppSchedulables. + */ + public boolean isActive(); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Tue Aug 19 23:49:39 2014 @@ -139,4 +139,25 @@ public abstract class SchedulingPolicy { */ public abstract void computeShares( Collection schedulables, Resource totalResources); + + /** + * Check if the resource usage is over the fair share under this policy + * + * @param usage {@link Resource} the resource usage + * @param fairShare {@link Resource} the fair share + * @return true if check passes (is over) or false otherwise + */ + public abstract boolean checkIfUsageOverFairShare( + Resource usage, Resource fairShare); + + /** + * Check if a leaf queue's AM resource usage over its limit under this policy + * + * @param usage {@link Resource} the resource used by application masters + * @param maxAMResource {@link Resource} the maximum allowed resource for + * application masters + * @return true if AM resource usage is over the limit + */ + public abstract boolean checkIfAMResourceUsageOverLimit( + Resource usage, Resource maxAMResource); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Tue Aug 19 23:49:39 2014 @@ -32,5 +32,5 @@ import org.apache.hadoop.conf.Configurab @Private @Unstable public interface WeightAdjuster { - public double adjustWeight(AppSchedulable app, double curWeight); + public double adjustWeight(FSAppAttempt app, double curWeight); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java Tue Aug 19 23:49:39 2014 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; +import java.util.ArrayList; import java.util.Collection; import org.apache.hadoop.yarn.api.records.Resource; @@ -33,7 +34,31 @@ import org.apache.hadoop.yarn.server.res public class ComputeFairShares { private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25; - + + /** + * Compute fair share of the given schedulables.Fair share is an allocation of + * shares considering only active schedulables ie schedulables which have + * running apps. + * + * @param schedulables + * @param totalResources + * @param type + */ + public static void computeShares( + Collection schedulables, Resource totalResources, + ResourceType type) { + Collection activeSchedulables = new ArrayList(); + for (Schedulable sched : schedulables) { + if (sched.isActive()) { + activeSchedulables.add(sched); + } else { + setResourceValue(0, sched.getFairShare(), type); + } + } + + computeSharesInternal(activeSchedulables, totalResources, type); + } + /** * Given a set of Schedulables and a number of slots, compute their weighted * fair shares. The min and max shares and of the Schedulables are assumed to @@ -75,7 +100,7 @@ public class ComputeFairShares { * because resourceUsedWithWeightToResourceRatio is linear-time and the number of * iterations of binary search is a constant (dependent on desired precision). */ - public static void computeShares( + private static void computeSharesInternal( Collection schedulables, Resource totalResources, ResourceType type) { if (schedulables.isEmpty()) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Tue Aug 19 23:49:39 2014 @@ -70,6 +70,16 @@ public class DominantResourceFairnessPol } @Override + public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { + return !Resources.fitsIn(usage, fairShare); + } + + @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return !Resources.fitsIn(usage, maxAMResource); + } + + @Override public void initialize(Resource clusterCapacity) { comparator.setClusterCapacity(clusterCapacity); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Tue Aug 19 23:49:39 2014 @@ -65,6 +65,7 @@ public class FairSharePolicy extends Sch private static class FairShareComparator implements Comparator, Serializable { private static final long serialVersionUID = 5564969375856699313L; + private static final Resource ONE = Resources.createResource(1); @Override public int compare(Schedulable s1, Schedulable s2) { @@ -78,11 +79,10 @@ public class FairSharePolicy extends Sch s1.getResourceUsage(), minShare1); boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, s2.getResourceUsage(), minShare2); - Resource one = Resources.createResource(1); minShareRatio1 = (double) s1.getResourceUsage().getMemory() - / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory(); + / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory(); minShareRatio2 = (double) s2.getResourceUsage().getMemory() - / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory(); + / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory(); useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeights().getWeight(ResourceType.MEMORY); useToWeightRatio2 = s2.getResourceUsage().getMemory() / @@ -120,6 +120,16 @@ public class FairSharePolicy extends Sch } @Override + public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { + return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare); + } + + @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return usage.getMemory() > maxAMResource.getMemory(); + } + + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_ANY; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Tue Aug 19 23:49:39 2014 @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.re import java.io.Serializable; import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -89,6 +88,18 @@ public class FifoPolicy extends Scheduli } @Override + public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { + throw new UnsupportedOperationException( + "FifoPolicy doesn't support checkIfUsageOverFairshare operation, " + + "as FifoPolicy only works for FSLeafQueue."); + } + + @Override + public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) { + return usage.getMemory() > maxAMResource.getMemory(); + } + + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Aug 19 23:49:39 2014 @@ -25,7 +25,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.commons.logging.Log; @@ -38,7 +37,6 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -54,8 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; -import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -68,7 +64,6 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -76,11 +71,10 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -104,7 +98,8 @@ import com.google.common.annotations.Vis @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") -public class FifoScheduler extends AbstractYarnScheduler implements +public class FifoScheduler extends + AbstractYarnScheduler implements Configurable { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); @@ -114,11 +109,6 @@ public class FifoScheduler extends Abstr Configuration conf; - protected Map nodes = new ConcurrentHashMap(); - - private boolean initialized; - private Resource minimumAllocation; - private Resource maximumAllocation; private boolean usePortForNodeName; private ActiveUsersManager activeUsersManager; @@ -185,8 +175,60 @@ public class FifoScheduler extends Abstr public ActiveUsersManager getActiveUsersManager() { return activeUsersManager; } + + @Override + public void recoverContainer(Resource clusterResource, + SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) { + if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { + return; + } + increaseUsedResources(rmContainer); + updateAppHeadRoom(schedulerAttempt); + updateAvailableResourcesMetrics(); + } }; + public FifoScheduler() { + super(FifoScheduler.class.getName()); + } + + private synchronized void initScheduler(Configuration conf) { + validateConf(conf); + //Use ConcurrentSkipListMap because applications need to be ordered + this.applications = + new ConcurrentSkipListMap>(); + this.minimumAllocation = + Resources.createResource(conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); + this.maximumAllocation = + Resources.createResource(conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + this.usePortForNodeName = conf.getBoolean( + YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); + this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, + conf); + this.activeUsersManager = new ActiveUsersManager(metrics); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + initScheduler(conf); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + super.serviceStop(); + } + @Override public synchronized void setConf(Configuration conf) { this.conf = conf; @@ -218,18 +260,13 @@ public class FifoScheduler extends Abstr } @Override - public Resource getMinimumResourceCapability() { - return minimumAllocation; - } - - @Override public int getNumClusterNodes() { return nodes.size(); } - + @Override - public Resource getMaximumResourceCapability() { - return maximumAllocation; + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; } @Override @@ -237,31 +274,8 @@ public class FifoScheduler extends Abstr reinitialize(Configuration conf, RMContext rmContext) throws IOException { setConf(conf); - if (!this.initialized) { - validateConf(conf); - this.rmContext = rmContext; - //Use ConcurrentSkipListMap because applications need to be ordered - this.applications = - new ConcurrentSkipListMap(); - this.minimumAllocation = - Resources.createResource(conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - this.maximumAllocation = - Resources.createResource(conf.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); - this.usePortForNodeName = conf.getBoolean( - YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, - YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); - this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, - conf); - this.activeUsersManager = new ActiveUsersManager(metrics); - this.initialized = true; - } } - @Override public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, @@ -278,21 +292,7 @@ public class FifoScheduler extends Abstr clusterResource, minimumAllocation, maximumAllocation); // Release containers - for (ContainerId releasedContainer : release) { - RMContainer rmContainer = getRMContainer(releasedContainer); - if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FifoScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainer); - } - containerCompleted(rmContainer, - SchedulerUtils.createAbnormalContainerStatus( - releasedContainer, - SchedulerUtils.RELEASED_CONTAINER), - RMContainerEventType.RELEASED); - } + releaseContainers(release, application); synchronized (application) { @@ -332,50 +332,35 @@ public class FifoScheduler extends Abstr } } - @VisibleForTesting - FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) { - SchedulerApplication app = - applications.get(applicationAttemptId.getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - - @Override - public SchedulerAppReport getSchedulerAppInfo( - ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : new SchedulerAppReport(app); - } - - @Override - public ApplicationResourceUsageReport getAppResourceUsageReport( - ApplicationAttemptId applicationAttemptId) { - FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId); - return app == null ? null : app.getResourceUsageReport(); - } - private FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } - private synchronized void addApplication(ApplicationId applicationId, - String queue, String user) { - SchedulerApplication application = - new SchedulerApplication(DEFAULT_QUEUE, user); + @VisibleForTesting + public synchronized void addApplication(ApplicationId applicationId, + String queue, String user, boolean isAppRecovering) { + SchedulerApplication application = + new SchedulerApplication(DEFAULT_QUEUE, user); applications.put(applicationId, application); metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); - rmContext.getDispatcher().getEventHandler() + if (isAppRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); + } + } else { + rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); + } } - private synchronized void + @VisibleForTesting + public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, - boolean transferStateFromPreviousAttempt) { - SchedulerApplication application = + boolean transferStateFromPreviousAttempt, + boolean isAttemptRecovering) { + SchedulerApplication application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); // TODO: Fix store @@ -392,14 +377,22 @@ public class FifoScheduler extends Abstr metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); - rmContext.getDispatcher().getEventHandler().handle( + if (isAttemptRecovering) { + if (LOG.isDebugEnabled()) { + LOG.debug(appAttemptId + + " is recovering. Skipping notifying ATTEMPT_ADDED"); + } + } else { + rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); + } } private synchronized void doneApplication(ApplicationId applicationId, RMAppState finalState) { - SchedulerApplication application = applications.get(applicationId); + SchedulerApplication application = + applications.get(applicationId); if (application == null){ LOG.warn("Couldn't find application " + applicationId); return; @@ -417,7 +410,7 @@ public class FifoScheduler extends Abstr RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers) throws IOException { FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId); - SchedulerApplication application = + SchedulerApplication application = applications.get(applicationAttemptId.getApplicationId()); if (application == null || attempt == null) { throw new IOException("Unknown application " + applicationAttemptId + @@ -433,7 +426,7 @@ public class FifoScheduler extends Abstr LOG.info("Skip killing " + container.getContainerId()); continue; } - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); @@ -454,10 +447,13 @@ public class FifoScheduler extends Abstr " #applications=" + applications.size()); // Try to assign containers to applications in fifo order - for (Map.Entry e : applications + for (Map.Entry> e : applications .entrySet()) { - FiCaSchedulerApp application = - (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt(); + FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt(); + if (application == null) { + continue; + } + LOG.debug("pre-assignContainers"); application.showRequests(); synchronized (application) { @@ -494,10 +490,13 @@ public class FifoScheduler extends Abstr // Update the applications' headroom to correctly take into // account the containers assigned in this update. - for (SchedulerApplication application : applications.values()) { + for (SchedulerApplication application : applications.values()) { FiCaSchedulerApp attempt = (FiCaSchedulerApp) application.getCurrentAppAttempt(); - attempt.setHeadroom(Resources.subtract(clusterResource, usedResource)); + if (attempt == null) { + continue; + } + updateAppHeadRoom(attempt); } } @@ -668,11 +667,10 @@ public class FifoScheduler extends Abstr application.allocate(type, node, priority, request, container); // Inform the node - node.allocateContainer(application.getApplicationId(), - rmContainer); + node.allocateContainer(rmContainer); // Update usage for this container - Resources.addTo(usedResource, capability); + increaseUsedResources(rmContainer); } } @@ -702,7 +700,7 @@ public class FifoScheduler extends Abstr for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - containerCompleted(getRMContainer(containerId), + completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -716,9 +714,22 @@ public class FifoScheduler extends Abstr LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = " + node.getAvailableResource()); } - - metrics.setAvailableResourcesToQueue( - Resources.subtract(clusterResource, usedResource)); + + updateAvailableResourcesMetrics(); + } + + private void increaseUsedResources(RMContainer rmContainer) { + Resources.addTo(usedResource, rmContainer.getAllocatedResource()); + } + + private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) { + schedulerAttempt.setHeadroom(Resources.subtract(clusterResource, + usedResource)); + } + + private void updateAvailableResourcesMetrics() { + metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource, + usedResource)); } @Override @@ -728,6 +739,9 @@ public class FifoScheduler extends Abstr { NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event; addNode(nodeAddedEvent.getAddedRMNode()); + recoverContainersOnNode(nodeAddedEvent.getContainerReports(), + nodeAddedEvent.getAddedRMNode()); + } break; case NODE_REMOVED: @@ -747,7 +761,8 @@ public class FifoScheduler extends Abstr { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), - appAddedEvent.getQueue(), appAddedEvent.getUser()); + appAddedEvent.getQueue(), appAddedEvent.getUser(), + appAddedEvent.getIsAppRecovering()); } break; case APP_REMOVED: @@ -762,7 +777,8 @@ public class FifoScheduler extends Abstr AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), - appAttemptAddedEvent.getTransferStateFromPreviousAttempt()); + appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), + appAttemptAddedEvent.getIsAttemptRecovering()); } break; case APP_ATTEMPT_REMOVED: @@ -785,7 +801,7 @@ public class FifoScheduler extends Abstr ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerid = containerExpiredEvent.getContainerId(); - containerCompleted(getRMContainer(containerid), + completedContainer(getRMContainer(containerid), SchedulerUtils.createAbnormalContainerStatus( containerid, SchedulerUtils.EXPIRED_CONTAINER), @@ -797,25 +813,9 @@ public class FifoScheduler extends Abstr } } - private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { - // Get the application for the finished container - FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId); - if (application == null) { - LOG.info("Unknown application " - + containerId.getApplicationAttemptId().getApplicationId() - + " launched container " + containerId + " on node: " + node); - // Some unknown container sneaked into the system. Kill it. - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId)); - - return; - } - - application.containerLaunchedOnNode(containerId, node.getNodeID()); - } - @Lock(FifoScheduler.class) - private synchronized void containerCompleted(RMContainer rmContainer, + @Override + protected synchronized void completedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); @@ -856,7 +856,6 @@ public class FifoScheduler extends Abstr } - private Resource clusterResource = recordFactory.newRecordInstance(Resource.class); private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private synchronized void removeNode(RMNode nodeInfo) { @@ -866,7 +865,7 @@ public class FifoScheduler extends Abstr } // Kill running containers for(RMContainer container : node.getRunningContainers()) { - containerCompleted(container, + completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -903,28 +902,11 @@ public class FifoScheduler extends Abstr } @Override - public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) { - FiCaSchedulerNode node = getNode(nodeId); - return node == null ? null : new SchedulerNodeReport(node); - } - - @Override public RMContainer getRMContainer(ContainerId containerId) { FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId); return (attempt == null) ? null : attempt.getRMContainer(containerId); } - private FiCaSchedulerApp getCurrentAttemptForContainer( - ContainerId containerId) { - SchedulerApplication app = - applications.get(containerId.getApplicationAttemptId() - .getApplicationId()); - if (app != null) { - return (FiCaSchedulerApp) app.getCurrentAppAttempt(); - } - return null; - } - @Override public QueueMetrics getRootQueueMetrics() { return DEFAULT_QUEUE.getMetrics(); @@ -935,13 +917,14 @@ public class FifoScheduler extends Abstr QueueACL acl, String queueName) { return DEFAULT_QUEUE.hasAccess(acl, callerUGI); } - + @Override - public synchronized List getAppsInQueue(String queueName) { + public synchronized List + getAppsInQueue(String queueName) { if (queueName.equals(DEFAULT_QUEUE.getQueueName())) { - List attempts = new ArrayList( - applications.size()); - for (SchedulerApplication app : applications.values()) { + List attempts = + new ArrayList(applications.size()); + for (SchedulerApplication app : applications.values()) { attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId()); } return attempts; @@ -950,4 +933,7 @@ public class FifoScheduler extends Abstr } } + public Resource getUsedResource() { + return usedResource; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Tue Aug 19 23:49:39 2014 @@ -19,22 +19,32 @@ package org.apache.hadoop.yarn.server.resourcemanager.security; import java.io.IOException; -import java.util.HashMap; -import java.util.Map; +import java.security.SecureRandom; +import java.util.HashSet; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; - -import javax.crypto.SecretKey; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState; +import org.apache.hadoop.yarn.server.security.MasterKeyData; + +import com.google.common.annotations.VisibleForTesting; /** * AMRM-tokens are per ApplicationAttempt. If users redistribute their @@ -49,40 +59,73 @@ public class AMRMTokenSecretManager exte private static final Log LOG = LogFactory .getLog(AMRMTokenSecretManager.class); - private SecretKey masterKey; + private int serialNo = new SecureRandom().nextInt(); + private MasterKeyData nextMasterKey; + private MasterKeyData currentMasterKey; + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Lock readLock = readWriteLock.readLock(); + private final Lock writeLock = readWriteLock.writeLock(); + private final Timer timer; private final long rollingInterval; + private final long activationDelay; + private RMContext rmContext; - private final Map passwords = - new HashMap(); + private final Set appAttemptSet = + new HashSet(); /** * Create an {@link AMRMTokenSecretManager} */ - public AMRMTokenSecretManager(Configuration conf) { - rollMasterKey(); + public AMRMTokenSecretManager(Configuration conf, RMContext rmContext) { + this.rmContext = rmContext; this.timer = new Timer(); this.rollingInterval = conf .getLong( YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS, YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000; + // Adding delay = 1.5 * expiry interval makes sure that all active AMs get + // the updated shared-key. + this.activationDelay = + (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5); + LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval + + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms"); + if (rollingInterval <= activationDelay * 2) { + throw new IllegalArgumentException( + YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS + + " should be more than 2 X " + + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS); + } } public void start() { - this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval); + if (this.currentMasterKey == null) { + this.currentMasterKey = createNewMasterKey(); + AMRMTokenSecretManagerState state = + AMRMTokenSecretManagerState.newInstance( + this.currentMasterKey.getMasterKey(), null); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state, + false); + } + this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval, + rollingInterval); } public void stop() { this.timer.cancel(); } - public synchronized void applicationMasterFinished( - ApplicationAttemptId appAttemptId) { - if (LOG.isDebugEnabled()) { - LOG.debug("Application finished, removing password for " + appAttemptId); + public void applicationMasterFinished(ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Application finished, removing password for " + appAttemptId); + this.appAttemptSet.remove(appAttemptId); + } finally { + this.writeLock.unlock(); } - this.passwords.remove(appAttemptId); } private class MasterKeyRoller extends TimerTask { @@ -93,49 +136,100 @@ public class AMRMTokenSecretManager exte } @Private - public synchronized void setMasterKey(SecretKey masterKey) { - this.masterKey = masterKey; + void rollMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Rolling master-key for amrm-tokens"); + this.nextMasterKey = createNewMasterKey(); + AMRMTokenSecretManagerState state = + AMRMTokenSecretManagerState.newInstance( + this.currentMasterKey.getMasterKey(), + this.nextMasterKey.getMasterKey()); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state, + true); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } finally { + this.writeLock.unlock(); + } } - @Private - public synchronized SecretKey getMasterKey() { - return this.masterKey; + private class NextKeyActivator extends TimerTask { + @Override + public void run() { + activateNextMasterKey(); + } + } + + public void activateNextMasterKey() { + this.writeLock.lock(); + try { + LOG.info("Activating next master key with id: " + + this.nextMasterKey.getMasterKey().getKeyId()); + this.currentMasterKey = this.nextMasterKey; + this.nextMasterKey = null; + AMRMTokenSecretManagerState state = + AMRMTokenSecretManagerState.newInstance( + this.currentMasterKey.getMasterKey(), null); + rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state, + true); + } finally { + this.writeLock.unlock(); + } } @Private - synchronized void rollMasterKey() { - LOG.info("Rolling master-key for amrm-tokens"); - this.masterKey = generateSecret(); + @VisibleForTesting + public MasterKeyData createNewMasterKey() { + this.writeLock.lock(); + try { + return new MasterKeyData(serialNo++, generateSecret()); + } finally { + this.writeLock.unlock(); + } } - /** - * Create a password for a given {@link AMRMTokenIdentifier}. Used to - * send to the AppicationAttempt which can give it back during authentication. - */ - @Override - public synchronized byte[] createPassword( - AMRMTokenIdentifier identifier) { - ApplicationAttemptId applicationAttemptId = - identifier.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Creating password for " + applicationAttemptId); - } - byte[] password = createPassword(identifier.getBytes(), masterKey); - this.passwords.put(applicationAttemptId, password); - return password; + public Token createAndGetAMRMToken( + ApplicationAttemptId appAttemptId) { + this.writeLock.lock(); + try { + LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId); + AMRMTokenIdentifier identifier = + new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey() + .getKeyId()); + byte[] password = this.createPassword(identifier); + appAttemptSet.add(appAttemptId); + return new Token(identifier.getBytes(), password, + identifier.getKind(), new Text()); + } finally { + this.writeLock.unlock(); + } + } + + // If nextMasterKey is not Null, then return nextMasterKey + // otherwise return currentMasterKey + @VisibleForTesting + public MasterKeyData getMasterKey() { + this.readLock.lock(); + try { + return nextMasterKey == null ? currentMasterKey : nextMasterKey; + } finally { + this.readLock.unlock(); + } } /** * Populate persisted password of AMRMToken back to AMRMTokenSecretManager. */ - public synchronized void - addPersistedPassword(Token token) throws IOException { - AMRMTokenIdentifier identifier = token.decodeIdentifier(); - if (LOG.isDebugEnabled()) { + public void addPersistedPassword(Token token) + throws IOException { + this.writeLock.lock(); + try { + AMRMTokenIdentifier identifier = token.decodeIdentifier(); LOG.debug("Adding password for " + identifier.getApplicationAttemptId()); + appAttemptSet.add(identifier.getApplicationAttemptId()); + } finally { + this.writeLock.unlock(); } - this.passwords.put(identifier.getApplicationAttemptId(), - token.getPassword()); } /** @@ -143,19 +237,33 @@ public class AMRMTokenSecretManager exte * Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}. */ @Override - public synchronized byte[] retrievePassword( - AMRMTokenIdentifier identifier) throws InvalidToken { - ApplicationAttemptId applicationAttemptId = - identifier.getApplicationAttemptId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to retrieve password for " + applicationAttemptId); - } - byte[] password = this.passwords.get(applicationAttemptId); - if (password == null) { - throw new InvalidToken("Password not found for ApplicationAttempt " - + applicationAttemptId); + public byte[] retrievePassword(AMRMTokenIdentifier identifier) + throws InvalidToken { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to retrieve password for " + applicationAttemptId); + } + if (!appAttemptSet.contains(applicationAttemptId)) { + throw new InvalidToken(applicationAttemptId + + " not found in AMRMTokenSecretManager."); + } + if (identifier.getKeyId() == this.currentMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.currentMasterKey.getSecretKey()); + } else if (nextMasterKey != null + && identifier.getKeyId() == this.nextMasterKey.getMasterKey() + .getKeyId()) { + return createPassword(identifier.getBytes(), + this.nextMasterKey.getSecretKey()); + } + throw new InvalidToken("Invalid AMRMToken from " + applicationAttemptId); + } finally { + this.readLock.unlock(); } - return password; } /** @@ -167,4 +275,61 @@ public class AMRMTokenSecretManager exte return new AMRMTokenIdentifier(); } + @Private + @VisibleForTesting + public MasterKeyData getCurrnetMasterKeyData() { + this.readLock.lock(); + try { + return this.currentMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Private + @VisibleForTesting + public MasterKeyData getNextMasterKeyData() { + this.readLock.lock(); + try { + return this.nextMasterKey; + } finally { + this.readLock.unlock(); + } + } + + @Override + @Private + protected byte[] createPassword(AMRMTokenIdentifier identifier) { + this.readLock.lock(); + try { + ApplicationAttemptId applicationAttemptId = + identifier.getApplicationAttemptId(); + LOG.info("Creating password for " + applicationAttemptId); + return createPassword(identifier.getBytes(), getMasterKey() + .getSecretKey()); + } finally { + this.readLock.unlock(); + } + } + + public void recover(RMState state) { + if (state.getAMRMTokenSecretManagerState() != null) { + // recover the current master key + MasterKey currentKey = + state.getAMRMTokenSecretManagerState().getCurrentMasterKey(); + this.currentMasterKey = + new MasterKeyData(currentKey, createSecretKey(currentKey.getBytes() + .array())); + + // recover the next master key if not null + MasterKey nextKey = + state.getAMRMTokenSecretManagerState().getNextMasterKey(); + if (nextKey != null) { + this.nextMasterKey = + new MasterKeyData(nextKey, createSecretKey(nextKey.getBytes() + .array())); + this.timer.schedule(new NextKeyActivator(), this.activationDelay); + } + } + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Tue Aug 19 23:49:39 2014 @@ -388,7 +388,11 @@ public class DelegationTokenRenewer exte // If user provides incorrect token then it should not be added for // renewal. for (DelegationTokenToRenew dtr : tokenList) { - renewToken(dtr); + try { + renewToken(dtr); + } catch (IOException ioe) { + throw new IOException("Failed to renew token: " + dtr.token, ioe); + } } for (DelegationTokenToRenew dtr : tokenList) { addTokenToList(dtr); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java Tue Aug 19 23:49:39 2014 @@ -27,6 +27,7 @@ import org.apache.hadoop.classification. import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -34,8 +35,8 @@ import org.apache.hadoop.yarn.security.C import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.security.MasterKeyData; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; /** * SecretManager for ContainerTokens. This is RM-specific and rolls the @@ -169,11 +170,13 @@ public class RMContainerTokenSecretManag * @param nodeId * @param appSubmitter * @param capability + * @param priority + * @param createTime * @return the container-token */ - public Token - createContainerToken(ContainerId containerId, NodeId nodeId, - String appSubmitter, Resource capability) { + public Token createContainerToken(ContainerId containerId, NodeId nodeId, + String appSubmitter, Resource capability, Priority priority, + long createTime) { byte[] password; ContainerTokenIdentifier tokenIdentifier; long expiryTimeStamp = @@ -185,7 +188,8 @@ public class RMContainerTokenSecretManag tokenIdentifier = new ContainerTokenIdentifier(containerId, nodeId.toString(), appSubmitter, capability, expiryTimeStamp, this.currentMasterKey - .getMasterKey().getKeyId(), ResourceManager.getClusterTimeStamp()); + .getMasterKey().getKeyId(), + ResourceManager.getClusterTimeStamp(), priority, createTime); password = this.createPassword(tokenIdentifier); } finally { Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Tue Aug 19 23:49:39 2014 @@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.DelegationKey; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -193,4 +195,14 @@ public class RMDelegationTokenSecretMana addPersistedDelegationToken(entry.getKey(), entry.getValue()); } } + + public long getRenewDate(RMDelegationTokenIdentifier ident) + throws InvalidToken { + DelegationTokenInformation info = currentTokens.get(ident); + if (info == null) { + throw new InvalidToken("token (" + ident.toString() + + ") can't be found in cache"); + } + return info.getRenewDate(); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java Tue Aug 19 23:49:39 2014 @@ -36,7 +36,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; @@ -110,19 +112,42 @@ public class AppBlock extends HtmlBlock setTitle(join("Application ", aid)); - info("Application Overview"). - _("User:", app.getUser()). - _("Name:", app.getName()). - _("Application Type:", app.getApplicationType()). - _("Application Tags:", app.getApplicationTags()). - _("State:", app.getState()). - _("FinalStatus:", app.getFinalStatus()). - _("Started:", Times.format(app.getStartTime())). - _("Elapsed:", StringUtils.formatTime( - Times.elapsed(app.getStartTime(), app.getFinishTime()))). - _("Tracking URL:", !app.isTrackingUrlReady() ? - "#" : app.getTrackingUrlPretty(), app.getTrackingUI()). - _("Diagnostics:", app.getNote()); + RMAppMetrics appMerics = rmApp.getRMAppMetrics(); + RMAppAttemptMetrics attemptMetrics = + rmApp.getCurrentAppAttempt().getRMAppAttemptMetrics(); + info("Application Overview") + ._("User:", app.getUser()) + ._("Name:", app.getName()) + ._("Application Type:", app.getApplicationType()) + ._("Application Tags:", app.getApplicationTags()) + ._("State:", app.getState()) + ._("FinalStatus:", app.getFinalStatus()) + ._("Started:", Times.format(app.getStartTime())) + ._("Elapsed:", + StringUtils.formatTime(Times.elapsed(app.getStartTime(), + app.getFinishTime()))) + ._("Tracking URL:", + !app.isTrackingUrlReady() ? "#" : app.getTrackingUrlPretty(), + app.getTrackingUI()) + ._("Diagnostics:", app.getNote()); + + DIV pdiv = html. + _(InfoBlock.class). + div(_INFO_WRAP); + info("Application Overview").clear(); + info("Application Metrics") + ._("Total Resource Preempted:", + appMerics.getResourcePreempted()) + ._("Total Number of Non-AM Containers Preempted:", + String.valueOf(appMerics.getNumNonAMContainersPreempted())) + ._("Total Number of AM Containers Preempted:", + String.valueOf(appMerics.getNumAMContainersPreempted())) + ._("Resource Preempted from Current Attempt:", + attemptMetrics.getResourcePreempted()) + ._("Number of Non-AM Containers Preempted from Current Attempt:", + String.valueOf(attemptMetrics + .getNumNonAMContainersPreempted())); + pdiv._(); Collection attempts = rmApp.getAppAttempts().values(); String amString = Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java Tue Aug 19 23:49:39 2014 @@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.uti import java.util.ArrayList; import org.apache.hadoop.util.StringUtils; -import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -109,7 +108,7 @@ class CapacitySchedulerPage extends RmVi _("Absolute Used Capacity:", percent(lqinfo.getAbsoluteUsedCapacity() / 100)). _("Absolute Capacity:", percent(lqinfo.getAbsoluteCapacity() / 100)). _("Absolute Max Capacity:", percent(lqinfo.getAbsoluteMaxCapacity() / 100)). - _("Used Resources:", StringEscapeUtils.escapeHtml(lqinfo.getResourcesUsed().toString())). + _("Used Resources:", lqinfo.getResourcesUsed().toString()). _("Num Schedulable Applications:", Integer.toString(lqinfo.getNumActiveApplications())). _("Num Non-Schedulable Applications:", Integer.toString(lqinfo.getNumPendingApplications())). _("Num Containers:", Integer.toString(lqinfo.getNumContainers())). @@ -263,7 +262,7 @@ class CapacitySchedulerPage extends RmVi " var q = $('.q', data.rslt.obj).first().text();", " if (q == 'root') q = '';", " else q = '^' + q.substr(q.lastIndexOf('.') + 1) + '$';", - " $('#apps').dataTable().fnFilter(q, 3, true);", + " $('#apps').dataTable().fnFilter(q, 4, true);", " });", " $('#cs').show();", "});")._(). Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java Tue Aug 19 23:49:39 2014 @@ -137,7 +137,7 @@ class DefaultSchedulerPage extends RmVie " $('#cs').bind('select_node.jstree', function(e, data) {", " var q = $('.q', data.rslt.obj).first().text();", " if (q == 'root') q = '';", - " $('#apps').dataTable().fnFilter(q, 3);", + " $('#apps').dataTable().fnFilter(q, 4);", " });", " $('#cs').show();", "});")._(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java Tue Aug 19 23:49:39 2014 @@ -25,6 +25,8 @@ import static org.apache.hadoop.yarn.web import java.util.Collection; import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.commons.lang.StringEscapeUtils; @@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo; @@ -60,7 +63,14 @@ public class FairSchedulerAppsBlock exte super(ctx); FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler(); fsinfo = new FairSchedulerInfo(scheduler); - apps = rmContext.getRMApps(); + apps = new ConcurrentHashMap(); + for (Map.Entry entry : rmContext.getRMApps().entrySet()) { + if (!(RMAppState.NEW.equals(entry.getValue().getState()) + || RMAppState.NEW_SAVING.equals(entry.getValue().getState()) + || RMAppState.SUBMITTED.equals(entry.getValue().getState()))) { + apps.put(entry.getKey(), entry.getValue()); + } + } this.conf = conf; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Tue Aug 19 23:49:39 2014 @@ -22,7 +22,6 @@ import static org.apache.hadoop.yarn.uti import java.util.Collection; -import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo; @@ -65,20 +64,16 @@ public class FairSchedulerPage extends R @Override protected void render(Block html) { ResponseInfo ri = info("\'" + qinfo.getQueueName() + "\' Queue Status"). - _("Used Resources:", StringEscapeUtils.escapeHtml( - qinfo.getUsedResources().toString())). + _("Used Resources:", qinfo.getUsedResources().toString()). _("Num Active Applications:", qinfo.getNumActiveApplications()). _("Num Pending Applications:", qinfo.getNumPendingApplications()). - _("Min Resources:", StringEscapeUtils.escapeHtml( - qinfo.getMinResources().toString())). - _("Max Resources:", StringEscapeUtils.escapeHtml( - qinfo.getMaxResources().toString())); + _("Min Resources:", qinfo.getMinResources().toString()). + _("Max Resources:", qinfo.getMaxResources().toString()); int maxApps = qinfo.getMaxApplications(); if (maxApps < Integer.MAX_VALUE) { ri._("Max Running Applications:", qinfo.getMaxApplications()); } - ri._("Fair Share:", StringEscapeUtils.escapeHtml( - qinfo.getFairShare().toString())); + ri._("Fair Share:", qinfo.getFairShare().toString()); html._(InfoBlock.class); @@ -215,7 +210,7 @@ public class FairSchedulerPage extends R " var q = $('.q', data.rslt.obj).first().text();", " if (q == 'root') q = '';", " else q = '^' + q.substr(q.lastIndexOf('.') + 1) + '$';", - " $('#apps').dataTable().fnFilter(q, 3, true);", + " $('#apps').dataTable().fnFilter(q, 4, true);", " });", " $('#cs').show();", "});")._().