Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B47F3D3C4 for ; Tue, 17 Jul 2012 01:51:42 +0000 (UTC) Received: (qmail 5130 invoked by uid 500); 17 Jul 2012 01:51:42 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 5086 invoked by uid 500); 17 Jul 2012 01:51:42 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 5076 invoked by uid 99); 17 Jul 2012 01:51:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jul 2012 01:51:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_OPPORTUN1 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jul 2012 01:51:33 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7C36F2388A6C; Tue, 17 Jul 2012 01:51:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1362334 [2/3] - in /hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/ hadoop-mapreduce-examples/ hadoop-yarn/hadoop-yarn-server/hadoop... Date: Tue, 17 Jul 2012 01:51:10 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120717015111.7C36F2388A6C@eris.apache.org> Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1362334&r1=1362333&r2=1362334&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original) +++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Tue Jul 17 01:51:07 2012 @@ -18,31 +18,410 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; -/** - * This class extends the application lifecycle management contained with - * the {@link SchedulerApp} class and adds delay-scheduling information - * specific to the FairScheduler. - */ -public class FSSchedulerApp extends SchedulerApp { - private static final Log LOG = LogFactory.getLog(SchedulerApp.class); +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; + +public class FSSchedulerApp extends SchedulerApplication { + + private static final Log LOG = LogFactory.getLog(FSSchedulerApp.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + private final AppSchedulingInfo appSchedulingInfo; + private final Queue queue; + + private final Resource currentConsumption = recordFactory + .newRecordInstance(Resource.class); + private Resource resourceLimit = recordFactory + .newRecordInstance(Resource.class); + + private Map liveContainers + = new HashMap(); + private List newlyAllocatedContainers = + new ArrayList(); + + final Map> reservedContainers = + new HashMap>(); + + /** + * Count how many times the application has been given an opportunity + * to schedule a task at each priority. Each time the scheduler + * asks the application for a task at this priority, it is incremented, + * and each time the application successfully schedules a task, it + * is reset to 0. + */ + Multiset schedulingOpportunities = HashMultiset.create(); + + Multiset reReservations = HashMultiset.create(); + + Resource currentReservation = recordFactory + .newRecordInstance(Resource.class); + + private final RMContext rmContext; + public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, + String user, Queue queue, ActiveUsersManager activeUsersManager, + RMContext rmContext, ApplicationStore store) { + this.rmContext = rmContext; + this.appSchedulingInfo = + new AppSchedulingInfo(applicationAttemptId, user, queue, + activeUsersManager, store); + this.queue = queue; + } + + public ApplicationId getApplicationId() { + return this.appSchedulingInfo.getApplicationId(); + } + + @Override + public ApplicationAttemptId getApplicationAttemptId() { + return this.appSchedulingInfo.getApplicationAttemptId(); + } + + public String getUser() { + return this.appSchedulingInfo.getUser(); + } + + public synchronized void updateResourceRequests( + List requests) { + this.appSchedulingInfo.updateResourceRequests(requests); + } + + public Map getResourceRequests(Priority priority) { + return this.appSchedulingInfo.getResourceRequests(priority); + } + + public int getNewContainerId() { + return this.appSchedulingInfo.getNewContainerId(); + } + + public Collection getPriorities() { + return this.appSchedulingInfo.getPriorities(); + } + + public ResourceRequest getResourceRequest(Priority priority, String nodeAddress) { + return this.appSchedulingInfo.getResourceRequest(priority, nodeAddress); + } + + public synchronized int getTotalRequiredResources(Priority priority) { + return getResourceRequest(priority, RMNode.ANY).getNumContainers(); + } + + public Resource getResource(Priority priority) { + return this.appSchedulingInfo.getResource(priority); + } + + /** + * Is this application pending? + * @return true if it is else false. + */ + @Override + public boolean isPending() { + return this.appSchedulingInfo.isPending(); + } + + public String getQueueName() { + return this.appSchedulingInfo.getQueueName(); + } + + /** + * Get the list of live containers + * @return All of the live containers + */ + @Override + public synchronized Collection getLiveContainers() { + return new ArrayList(liveContainers.values()); + } + + public synchronized void stop(RMAppAttemptState rmAppAttemptFinalState) { + // Cleanup all scheduling information + this.appSchedulingInfo.stop(rmAppAttemptFinalState); + } + + @SuppressWarnings("unchecked") + public synchronized void containerLaunchedOnNode(ContainerId containerId, + NodeId nodeId) { + // Inform the container + RMContainer rmContainer = + getRMContainer(containerId); + if (rmContainer == null) { + // Some unknown container sneaked into the system. Kill it. + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeCleanContainerEvent(nodeId, containerId)); + return; + } + + rmContainer.handle(new RMContainerEvent(containerId, + RMContainerEventType.LAUNCHED)); + } + + synchronized public void containerCompleted(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + + Container container = rmContainer.getContainer(); + ContainerId containerId = container.getId(); + + // Inform the container + rmContainer.handle( + new RMContainerFinishedEvent( + containerId, + containerStatus, + event) + ); + LOG.info("Completed container: " + rmContainer.getContainerId() + + " in state: " + rmContainer.getState() + " event:" + event); + + // Remove from the list of containers + liveContainers.remove(rmContainer.getContainerId()); + + RMAuditLogger.logSuccess(getUser(), + AuditConstants.RELEASE_CONTAINER, "SchedulerApp", + getApplicationId(), containerId); + + // Update usage metrics + Resource containerResource = rmContainer.getContainer().getResource(); + queue.getMetrics().releaseResources(getUser(), 1, containerResource); + Resources.subtractFrom(currentConsumption, containerResource); + } + + synchronized public List pullNewlyAllocatedContainers() { + List returnContainerList = new ArrayList( + newlyAllocatedContainers.size()); + for (RMContainer rmContainer : newlyAllocatedContainers) { + rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(), + RMContainerEventType.ACQUIRED)); + returnContainerList.add(rmContainer.getContainer()); + } + newlyAllocatedContainers.clear(); + return returnContainerList; + } + + public Resource getCurrentConsumption() { + return this.currentConsumption; + } + + synchronized public void showRequests() { + if (LOG.isDebugEnabled()) { + for (Priority priority : getPriorities()) { + Map requests = getResourceRequests(priority); + if (requests != null) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " headRoom=" + getHeadroom() + + " currentConsumption=" + currentConsumption.getMemory()); + for (ResourceRequest request : requests.values()) { + LOG.debug("showRequests:" + " application=" + getApplicationId() + + " request=" + request); + } + } + } + } + } + + public synchronized RMContainer getRMContainer(ContainerId id) { + return liveContainers.get(id); + } + + synchronized public void addSchedulingOpportunity(Priority priority) { + this.schedulingOpportunities.setCount(priority, + schedulingOpportunities.count(priority) + 1); + } + + /** + * Return the number of times the application has been given an opportunity + * to schedule a task at the given priority since the last time it + * successfully did so. + */ + synchronized public int getSchedulingOpportunities(Priority priority) { + return this.schedulingOpportunities.count(priority); + } + + synchronized void resetReReservations(Priority priority) { + this.reReservations.setCount(priority, 0); + } + + synchronized void addReReservation(Priority priority) { + this.reReservations.add(priority); + } + + synchronized public int getReReservations(Priority priority) { + return this.reReservations.count(priority); + } + + public synchronized int getNumReservedContainers(Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + return (reservedContainers == null) ? 0 : reservedContainers.size(); + } + + /** + * Get total current reservations. + * Used only by unit tests + * @return total current reservations + */ + @Stable + @Private + public synchronized Resource getCurrentReservation() { + return currentReservation; + } + + public synchronized RMContainer reserve(FSSchedulerNode node, Priority priority, + RMContainer rmContainer, Container container) { + // Create RMContainer if necessary + if (rmContainer == null) { + rmContainer = + new RMContainerImpl(container, getApplicationAttemptId(), + node.getNodeID(), rmContext.getDispatcher().getEventHandler(), + rmContext.getContainerAllocationExpirer()); + + Resources.addTo(currentReservation, container.getResource()); + + // Reset the re-reservation count + resetReReservations(priority); + } else { + // Note down the re-reservation + addReReservation(priority); + } + rmContainer.handle(new RMContainerReservedEvent(container.getId(), + container.getResource(), node.getNodeID(), priority)); + + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers == null) { + reservedContainers = new HashMap(); + this.reservedContainers.put(priority, reservedContainers); + } + reservedContainers.put(node.getNodeID(), rmContainer); + + LOG.info("Application " + getApplicationId() + + " reserved container " + rmContainer + + " on node " + node + ", currently has " + reservedContainers.size() + + " at priority " + priority + + "; currentReservation " + currentReservation.getMemory()); + + return rmContainer; + } + + public synchronized void unreserve(FSSchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + RMContainer reservedContainer = reservedContainers.remove(node.getNodeID()); + if (reservedContainers.isEmpty()) { + this.reservedContainers.remove(priority); + } + + // Reset the re-reservation count + resetReReservations(priority); + + Resource resource = reservedContainer.getContainer().getResource(); + Resources.subtractFrom(currentReservation, resource); + + LOG.info("Application " + getApplicationId() + " unreserved " + " on node " + + node + ", currently has " + reservedContainers.size() + " at priority " + + priority + "; currentReservation " + currentReservation); + } + + /** + * Has the application reserved the given node at the + * given priority? + * @param node node to be checked + * @param priority priority of reserved container + * @return true is reserved, false if not + */ + public synchronized boolean isReserved(FSSchedulerNode node, Priority priority) { + Map reservedContainers = + this.reservedContainers.get(priority); + if (reservedContainers != null) { + return reservedContainers.containsKey(node.getNodeID()); + } + return false; + } + + public synchronized float getLocalityWaitFactor( + Priority priority, int clusterNodes) { + // Estimate: Required unique resources (i.e. hosts + racks) + int requiredResources = + Math.max(this.getResourceRequests(priority).size() - 1, 0); + + // waitFactor can't be more than '1' + // i.e. no point skipping more than clustersize opportunities + return Math.min(((float)requiredResources / clusterNodes), 1.0f); + } + + /** + * Get the list of reserved containers + * @return All of the reserved containers. + */ + @Override + public synchronized List getReservedContainers() { + List reservedContainers = new ArrayList(); + for (Map.Entry> e : + this.reservedContainers.entrySet()) { + reservedContainers.addAll(e.getValue().values()); + } + return reservedContainers; + } + + public synchronized void setHeadroom(Resource globalLimit) { + this.resourceLimit = globalLimit; + } + + /** + * Get available headroom in terms of resources for the application's user. + * @return available resource headroom + */ + public synchronized Resource getHeadroom() { + // Corner case to deal with applications being slightly over-limit + if (resourceLimit.getMemory() < 0) { + resourceLimit.setMemory(0); + } + + return resourceLimit; + } + + public Queue getQueue() { + return queue; + } /** * Delay scheduling: We often want to prioritize scheduling of node-local @@ -62,13 +441,6 @@ public class FSSchedulerApp extends Sche // Time of the last container scheduled at the current allowed level Map lastScheduledContainer = new HashMap(); - public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, - String user, Queue queue, ActiveUsersManager activeUsersManager, - RMContext rmContext, ApplicationStore store) { - super(applicationAttemptId, user, queue, activeUsersManager, - rmContext, store); - } - /** * Should be called when an application has successfully scheduled a container, * or when the scheduling locality threshold is relaxed. @@ -78,7 +450,7 @@ public class FSSchedulerApp extends Sche */ synchronized public void resetSchedulingOpportunities(Priority priority) { this.lastScheduledContainer.put(priority, System.currentTimeMillis()); - super.resetSchedulingOpportunities(priority); + this.schedulingOpportunities.setCount(priority, 0); } /** @@ -127,7 +499,7 @@ public class FSSchedulerApp extends Sche } - synchronized public RMContainer allocate(NodeType type, SchedulerNode node, + synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, Container container) { // Update allowed locality level @@ -143,7 +515,42 @@ public class FSSchedulerApp extends Sche this.resetAllowedLocalityLevel(priority, type); } } - return super.allocate(type, node, priority, request, container); + + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (getTotalRequiredResources(priority) <= 0) { + return null; + } + + // Create RMContainer + RMContainer rmContainer = new RMContainerImpl(container, this + .getApplicationAttemptId(), node.getNodeID(), this.rmContext + .getDispatcher().getEventHandler(), this.rmContext + .getContainerAllocationExpirer()); + + // Add it to allContainers list. + newlyAllocatedContainers.add(rmContainer); + liveContainers.put(container.getId(), rmContainer); + + // Update consumption and track allocations + appSchedulingInfo.allocate(type, node, priority, request, container); + Resources.addTo(currentConsumption, container.getResource()); + + // Inform the container + rmContainer.handle( + new RMContainerEvent(container.getId(), RMContainerEventType.START)); + + if (LOG.isDebugEnabled()) { + LOG.debug("allocate: applicationAttemptId=" + + container.getId().getApplicationAttemptId() + + " container=" + container.getId() + " host=" + + container.getNodeId().getHost() + " type=" + type); + } + RMAuditLogger.logSuccess(getUser(), + AuditConstants.ALLOC_CONTAINER, "SchedulerApp", + getApplicationId(), container.getId()); + + return rmContainer; } /** Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1362334&r1=1362333&r2=1362334&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original) +++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Tue Jul 17 01:51:07 2012 @@ -63,9 +63,7 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -116,12 +114,12 @@ public class FairScheduler implements Re // This stores per-application scheduling information, indexed by // attempt ID's for fast lookup. - protected Map applications - = new HashMap(); + protected Map applications + = new HashMap(); // Nodes in the cluster, indexed by NodeId - private Map nodes = - new ConcurrentHashMap(); + private Map nodes = + new ConcurrentHashMap(); // Aggregate capacity of the cluster private Resource clusterCapacity = @@ -158,7 +156,7 @@ public class FairScheduler implements Re } private RMContainer getRMContainer(ContainerId containerId) { - SchedulerApp application = + FSSchedulerApp application = applications.get(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); } @@ -294,7 +292,8 @@ public class FairScheduler implements Re if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) return; - Map apps = new HashMap(); + Map apps = + new HashMap(); Map queues = new HashMap(); // Collect running containers from over-scheduled queues @@ -526,7 +525,7 @@ public class FairScheduler implements Re LOG.info("Application " + applicationAttemptId + " is done." + " finalState=" + rmAppAttemptFinalState); - SchedulerApp application = applications.get(applicationAttemptId); + FSSchedulerApp application = applications.get(applicationAttemptId); if (application == null) { LOG.info("Unknown application " + applicationAttemptId + " has completed!"); @@ -576,7 +575,7 @@ public class FairScheduler implements Re // Get the application for the finished container ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); - SchedulerApp application = applications.get(applicationAttemptId); + FSSchedulerApp application = applications.get(applicationAttemptId); if (application == null) { LOG.info("Container " + container + " of" + " unknown application " + applicationAttemptId + @@ -585,7 +584,7 @@ public class FairScheduler implements Re } // Get the node on which the container was allocated - SchedulerNode node = nodes.get(container.getNodeId()); + FSSchedulerNode node = nodes.get(container.getNodeId()); if (rmContainer.getState() == RMContainerState.RESERVED) { application.unreserve(node, rmContainer.getReservedPriority()); @@ -602,7 +601,7 @@ public class FairScheduler implements Re } private synchronized void addNode(RMNode node) { - this.nodes.put(node.getNodeID(), new SchedulerNode(node)); + this.nodes.put(node.getNodeID(), new FSSchedulerNode(node)); Resources.addTo(clusterCapacity, node.getTotalCapability()); LOG.info("Added node " + node.getNodeAddress() + @@ -610,7 +609,7 @@ public class FairScheduler implements Re } private synchronized void removeNode(RMNode rmNode) { - SchedulerNode node = this.nodes.get(rmNode.getNodeID()); + FSSchedulerNode node = this.nodes.get(rmNode.getNodeID()); Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability()); // Remove running containers @@ -643,7 +642,7 @@ public class FairScheduler implements Re List ask, List release) { // Make sure this application exists - SchedulerApp application = applications.get(appAttemptId); + FSSchedulerApp application = applications.get(appAttemptId); if (application == null) { LOG.info("Calling allocate on removed " + "or non existant application " + appAttemptId); @@ -704,10 +703,10 @@ public class FairScheduler implements Re * Process a container which has launched on a node, as reported by the * node. */ - private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) { + private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { // Get the application for the finished container ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - SchedulerApp application = applications.get(applicationAttemptId); + FSSchedulerApp application = applications.get(applicationAttemptId); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + @@ -726,7 +725,7 @@ public class FairScheduler implements Re List completedContainers) { LOG.info("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity); eventLog.log("HEARTBEAT", nm.getHostName()); - SchedulerNode node = nodes.get(nm.getNodeID()); + FSSchedulerNode node = nodes.get(nm.getNodeID()); // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { @@ -749,7 +748,7 @@ public class FairScheduler implements Re // already, we try to complete the reservation. RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - SchedulerApp reservedApplication = + FSSchedulerApp reservedApplication = applications.get(reservedContainer.getApplicationAttemptId()); // Try to fulfill the reservation @@ -787,7 +786,7 @@ public class FairScheduler implements Re @Override public SchedulerNodeReport getNodeReport(NodeId nodeId) { - SchedulerNode node = nodes.get(nodeId); + FSSchedulerNode node = nodes.get(nodeId); return node == null ? null : new SchedulerNodeReport(node); } Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1362334&r1=1362333&r2=1362334&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original) +++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Tue Jul 17 01:51:07 2012 @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.w3c.dom.Document; import org.w3c.dom.Element; import org.w3c.dom.Node; @@ -422,7 +421,7 @@ public class QueueManager { /** * Remove an app */ - public synchronized void removeJob(SchedulerApp app) { + public synchronized void removeJob(FSSchedulerApp app) { getQueue(app.getQueueName()).removeJob(app); } Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1362334&r1=1362333&r2=1362334&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original) +++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Tue Jul 17 01:51:07 2012 @@ -23,7 +23,6 @@ import org.apache.hadoop.classification. import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; /** * A Schedulable represents an entity that can launch tasks, such as a job @@ -104,7 +103,7 @@ abstract class Schedulable { * already exists on this node, and the schedulable should fulfill that * reservation if possible. */ - public abstract Resource assignContainer(SchedulerNode node, boolean reserved); + public abstract Resource assignContainer(FSSchedulerNode node, boolean reserved); /** Assign a fair share to this Schedulable. */ public void setFairShare(Resource fairShare) { Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1362334&r1=1362333&r2=1362334&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original) +++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Jul 17 01:51:07 2012 @@ -71,11 +71,11 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; @@ -103,14 +103,14 @@ public class FifoScheduler implements Re private final static List EMPTY_CONTAINER_LIST = Arrays.asList(EMPTY_CONTAINER_ARRAY); private RMContext rmContext; - private Map nodes = new ConcurrentHashMap(); + private Map nodes = new ConcurrentHashMap(); private boolean initialized; private Resource minimumAllocation; private Resource maximumAllocation; - private Map applications - = new TreeMap(); + private Map applications + = new TreeMap(); private ActiveUsersManager activeUsersManager; @@ -223,7 +223,7 @@ public class FifoScheduler implements Re public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, List release) { - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.error("Calling allocate on removed " + "or non existant application " + applicationAttemptId); @@ -276,7 +276,7 @@ public class FifoScheduler implements Re } } - private SchedulerApp getApplication( + private FiCaSchedulerApp getApplication( ApplicationAttemptId applicationAttemptId) { return applications.get(applicationAttemptId); } @@ -284,19 +284,19 @@ public class FifoScheduler implements Re @Override public SchedulerAppReport getSchedulerAppInfo( ApplicationAttemptId applicationAttemptId) { - SchedulerApp app = getApplication(applicationAttemptId); + FiCaSchedulerApp app = getApplication(applicationAttemptId); return app == null ? null : new SchedulerAppReport(app); } - private SchedulerNode getNode(NodeId nodeId) { + private FiCaSchedulerNode getNode(NodeId nodeId) { return nodes.get(nodeId); } private synchronized void addApplication(ApplicationAttemptId appAttemptId, String user) { // TODO: Fix store - SchedulerApp schedulerApp = - new SchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, + FiCaSchedulerApp schedulerApp = + new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext, null); applications.put(appAttemptId, schedulerApp); metrics.submitApp(user, appAttemptId.getAttemptId()); @@ -311,7 +311,7 @@ public class FifoScheduler implements Re ApplicationAttemptId applicationAttemptId, RMAppAttemptState rmAppAttemptFinalState) throws IOException { - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { throw new IOException("Unknown application " + applicationAttemptId + " has completed!"); @@ -344,15 +344,15 @@ public class FifoScheduler implements Re * * @param node node on which resources are available to be allocated */ - private void assignContainers(SchedulerNode node) { + private void assignContainers(FiCaSchedulerNode node) { LOG.debug("assignContainers:" + " node=" + node.getRMNode().getNodeAddress() + " #applications=" + applications.size()); // Try to assign containers to applications in fifo order - for (Map.Entry e : applications + for (Map.Entry e : applications .entrySet()) { - SchedulerApp application = e.getValue(); + FiCaSchedulerApp application = e.getValue(); LOG.debug("pre-assignContainers"); application.showRequests(); synchronized (application) { @@ -383,15 +383,15 @@ public class FifoScheduler implements Re // Update the applications' headroom to correctly take into // account the containers assigned in this update. - for (SchedulerApp application : applications.values()) { + for (FiCaSchedulerApp application : applications.values()) { application.setHeadroom(Resources.subtract(clusterResource, usedResource)); } } - private int getMaxAllocatableContainers(SchedulerApp application, - Priority priority, SchedulerNode node, NodeType type) { + private int getMaxAllocatableContainers(FiCaSchedulerApp application, + Priority priority, FiCaSchedulerNode node, NodeType type) { ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, SchedulerNode.ANY); + application.getResourceRequest(priority, FiCaSchedulerNode.ANY); int maxContainers = offSwitchRequest.getNumContainers(); if (type == NodeType.OFF_SWITCH) { @@ -420,8 +420,8 @@ public class FifoScheduler implements Re } - private int assignContainersOnNode(SchedulerNode node, - SchedulerApp application, Priority priority + private int assignContainersOnNode(FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority ) { // Data-local int nodeLocalContainers = @@ -447,8 +447,8 @@ public class FifoScheduler implements Re return (nodeLocalContainers + rackLocalContainers + offSwitchContainers); } - private int assignNodeLocalContainers(SchedulerNode node, - SchedulerApp application, Priority priority) { + private int assignNodeLocalContainers(FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getNodeAddress()); @@ -473,15 +473,15 @@ public class FifoScheduler implements Re return assignedContainers; } - private int assignRackLocalContainers(SchedulerNode node, - SchedulerApp application, Priority priority) { + private int assignRackLocalContainers(FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = application.getResourceRequest(priority, node.getRMNode().getRackName()); if (request != null) { // Don't allocate on this rack if the application doens't need containers ResourceRequest offSwitchRequest = - application.getResourceRequest(priority, SchedulerNode.ANY); + application.getResourceRequest(priority, FiCaSchedulerNode.ANY); if (offSwitchRequest.getNumContainers() <= 0) { return 0; } @@ -498,11 +498,11 @@ public class FifoScheduler implements Re return assignedContainers; } - private int assignOffSwitchContainers(SchedulerNode node, - SchedulerApp application, Priority priority) { + private int assignOffSwitchContainers(FiCaSchedulerNode node, + FiCaSchedulerApp application, Priority priority) { int assignedContainers = 0; ResourceRequest request = - application.getResourceRequest(priority, SchedulerNode.ANY); + application.getResourceRequest(priority, FiCaSchedulerNode.ANY); if (request != null) { assignedContainers = assignContainer(node, application, priority, @@ -511,7 +511,7 @@ public class FifoScheduler implements Re return assignedContainers; } - private int assignContainer(SchedulerNode node, SchedulerApp application, + private int assignContainer(FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, int assignableContainers, ResourceRequest request, NodeType type) { LOG.debug("assignContainers:" + @@ -577,7 +577,7 @@ public class FifoScheduler implements Re private synchronized void nodeUpdate(RMNode rmNode, List newlyLaunchedContainers, List completedContainers) { - SchedulerNode node = getNode(rmNode.getNodeID()); + FiCaSchedulerNode node = getNode(rmNode.getNodeID()); // Processing the newly launched containers for (ContainerStatus launchedContainer : newlyLaunchedContainers) { @@ -667,10 +667,10 @@ public class FifoScheduler implements Re } } - private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) { + private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) { // Get the application for the finished container ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId(); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + @@ -696,10 +696,10 @@ public class FifoScheduler implements Re // Get the application for the finished container Container container = rmContainer.getContainer(); ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId(); - SchedulerApp application = getApplication(applicationAttemptId); + FiCaSchedulerApp application = getApplication(applicationAttemptId); // Get the node on which the container was allocated - SchedulerNode node = getNode(container.getNodeId()); + FiCaSchedulerNode node = getNode(container.getNodeId()); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + @@ -729,7 +729,7 @@ public class FifoScheduler implements Re private Resource usedResource = recordFactory.newRecordInstance(Resource.class); private synchronized void removeNode(RMNode nodeInfo) { - SchedulerNode node = getNode(nodeInfo.getNodeID()); + FiCaSchedulerNode node = getNode(nodeInfo.getNodeID()); if (node == null) { return; } @@ -761,7 +761,7 @@ public class FifoScheduler implements Re } private synchronized void addNode(RMNode nodeManager) { - this.nodes.put(nodeManager.getNodeID(), new SchedulerNode(nodeManager)); + this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager)); Resources.addTo(clusterResource, nodeManager.getTotalCapability()); } @@ -778,12 +778,12 @@ public class FifoScheduler implements Re @Override public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) { - SchedulerNode node = getNode(nodeId); + FiCaSchedulerNode node = getNode(nodeId); return node == null ? null : new SchedulerNodeReport(node); } private RMContainer getRMContainer(ContainerId containerId) { - SchedulerApp application = + FiCaSchedulerApp application = getApplication(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); } Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java?rev=1362334&r1=1362333&r2=1362334&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java (original) +++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java Tue Jul 17 01:51:07 2012 @@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.api import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.util.BuilderUtils; @Private @@ -72,7 +72,7 @@ public class NodeManager implements Cont Resource used = recordFactory.newRecordInstance(Resource.class); final ResourceTrackerService resourceTrackerService; - final SchedulerNode schedulerNode; + final FiCaSchedulerNode schedulerNode; final Map> containers = new HashMap>(); @@ -98,7 +98,7 @@ public class NodeManager implements Cont request.setNodeId(this.nodeId); resourceTrackerService.registerNodeManager(request) .getRegistrationResponse(); - this.schedulerNode = new SchedulerNode(rmContext.getRMNodes().get( + this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get( this.nodeId)); // Sanity check Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1362334&r1=1362333&r2=1362334&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original) +++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Tue Jul 17 01:51:07 2012 @@ -40,8 +40,8 @@ import org.apache.hadoop.yarn.factory.pr import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -111,8 +111,8 @@ public class TestApplicationLimits { LOG.info("Setup top-level queues a and b"); } - private SchedulerApp getMockApplication(int appId, String user) { - SchedulerApp application = mock(SchedulerApp.class); + private FiCaSchedulerApp getMockApplication(int appId, String user) { + FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); ApplicationAttemptId applicationAttemptId = TestUtils.getMockApplicationAttemptId(appId, 0); doReturn(applicationAttemptId.getApplicationId()). @@ -209,7 +209,7 @@ public class TestApplicationLimits { int APPLICATION_ID = 0; // Submit first application - SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_0, user_0, A); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -217,7 +217,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit second application - SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_1, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -225,7 +225,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit third application, should remain pending - SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_2, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -240,7 +240,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_0)); // Submit another one for user_0 - SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_3, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -251,7 +251,7 @@ public class TestApplicationLimits { doReturn(3).when(queue).getMaximumActiveApplications(); // Submit first app for user_1 - SchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); + FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1); queue.submitApplication(app_4, user_1, A); assertEquals(3, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -261,7 +261,7 @@ public class TestApplicationLimits { assertEquals(0, queue.getNumPendingApplications(user_1)); // Submit second app for user_1, should block due to queue-limit - SchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); + FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1); queue.submitApplication(app_5, user_1, A); assertEquals(3, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); @@ -290,7 +290,7 @@ public class TestApplicationLimits { doReturn(2).when(queue).getMaximumActiveApplications(); // Submit first application - SchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_0, user_0, A); assertEquals(1, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -299,7 +299,7 @@ public class TestApplicationLimits { assertTrue(queue.activeApplications.contains(app_0)); // Submit second application - SchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_1, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(0, queue.getNumPendingApplications()); @@ -308,7 +308,7 @@ public class TestApplicationLimits { assertTrue(queue.activeApplications.contains(app_1)); // Submit third application, should remain pending - SchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_2, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(1, queue.getNumPendingApplications()); @@ -317,7 +317,7 @@ public class TestApplicationLimits { assertTrue(queue.pendingApplications.contains(app_2)); // Submit fourth application, should remain pending - SchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); + FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0); queue.submitApplication(app_3, user_0, A); assertEquals(2, queue.getNumActiveApplications()); assertEquals(2, queue.getNumPendingApplications()); @@ -393,7 +393,7 @@ public class TestApplicationLimits { String host_0 = "host_0"; String rack_0 = "rack_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB); final String user_0 = "user_0"; final String user_1 = "user_1"; @@ -408,8 +408,8 @@ public class TestApplicationLimits { // and check headroom final ApplicationAttemptId appAttemptId_0_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0_0 = - spy(new SchedulerApp(appAttemptId_0_0, user_0, queue, + FiCaSchedulerApp app_0_0 = + spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue, queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_0_0, user_0, A); @@ -427,8 +427,8 @@ public class TestApplicationLimits { // Submit second application from user_0, check headroom final ApplicationAttemptId appAttemptId_0_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_0_1 = - spy(new SchedulerApp(appAttemptId_0_1, user_0, queue, + FiCaSchedulerApp app_0_1 = + spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue, queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_0_1, user_0, A); @@ -446,8 +446,8 @@ public class TestApplicationLimits { // Submit first application from user_1, check for new headroom final ApplicationAttemptId appAttemptId_1_0 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_1_0 = - spy(new SchedulerApp(appAttemptId_1_0, user_1, queue, + FiCaSchedulerApp app_1_0 = + spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue, queue.getActiveUsersManager(), rmContext, null)); queue.submitApplication(app_1_0, user_1, A); Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1362334&r1=1362333&r2=1362334&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original) +++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Tue Jul 17 01:51:07 2012 @@ -62,8 +62,8 @@ import org.apache.hadoop.yarn.server.res import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.junit.After; import org.junit.Before; @@ -171,14 +171,14 @@ public class TestLeafQueue { @Override public Container answer(InvocationOnMock invocation) throws Throwable { - final SchedulerApp application = - (SchedulerApp)(invocation.getArguments()[0]); + final FiCaSchedulerApp application = + (FiCaSchedulerApp)(invocation.getArguments()[0]); final ContainerId containerId = TestUtils.getMockContainerId(application); Container container = TestUtils.getMockContainer( containerId, - ((SchedulerNode)(invocation.getArguments()[1])).getNodeID(), + ((FiCaSchedulerNode)(invocation.getArguments()[1])).getNodeID(), (Resource)(invocation.getArguments()[2]), ((Priority)invocation.getArguments()[3])); return container; @@ -186,8 +186,8 @@ public class TestLeafQueue { } ). when(queue).createContainer( - any(SchedulerApp.class), - any(SchedulerNode.class), + any(FiCaSchedulerApp.class), + any(FiCaSchedulerNode.class), any(Resource.class), any(Priority.class) ); @@ -195,7 +195,7 @@ public class TestLeafQueue { // 2. Stub out LeafQueue.parent.completedContainer CSQueue parent = queue.getParent(); doNothing().when(parent).completedContainer( - any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class), + any(Resource.class), any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), any(RMContainer.class), any(ContainerStatus.class), any(RMContainerEventType.class)); @@ -238,22 +238,22 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, B); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_0, B); // same user // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -284,14 +284,14 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 1); - SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_d, d, null, + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_d, d, null, rmContext, null); d.submitApplication(app_0, user_d, D); // Attempt the same application again final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(0, 2); - SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_d, d, null, + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_d, d, null, rmContext, null); d.submitApplication(app_1, user_d, D); // same user } @@ -309,7 +309,7 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils .getMockApplicationAttemptId(0, 1); - SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_0, a, null, + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, null, rmContext, null); a.submitApplication(app_0, user_0, B); @@ -324,7 +324,7 @@ public class TestLeafQueue { // Attempt the same application again final ApplicationAttemptId appAttemptId_1 = TestUtils .getMockApplicationAttemptId(0, 2); - SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_0, a, null, + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, null, rmContext, null); a.submitApplication(app_1, user_0, B); // same user @@ -359,22 +359,22 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_0, A); // same user // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -483,30 +483,30 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_2, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -576,30 +576,30 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_2, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -687,35 +687,35 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_0, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_1, user_0, A); // same user final ApplicationAttemptId appAttemptId_2 = TestUtils.getMockApplicationAttemptId(2, 0); - SchedulerApp app_2 = - new SchedulerApp(appAttemptId_2, user_1, a, + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_2, user_1, A); final ApplicationAttemptId appAttemptId_3 = TestUtils.getMockApplicationAttemptId(3, 0); - SchedulerApp app_3 = - new SchedulerApp(appAttemptId_3, user_2, a, + FiCaSchedulerApp app_3 = + new FiCaSchedulerApp(appAttemptId_3, user_2, a, a.getActiveUsersManager(), rmContext, null); a.submitApplication(app_3, user_2, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); final int numNodes = 1; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -862,21 +862,21 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (4*GB)); @@ -961,23 +961,23 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (4*GB)); @@ -1060,24 +1060,24 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_0, user_0, A); final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); - SchedulerApp app_1 = - new SchedulerApp(appAttemptId_1, user_1, a, + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_1, a, mock(ActiveUsersManager.class), rmContext, null); a.submitApplication(app_1, user_1, A); // Setup some nodes String host_0 = "host_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 4*GB); String host_1 = "host_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 4*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (4*GB)); @@ -1175,23 +1175,23 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks String host_0 = "host_0"; String rack_0 = "rack_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); String host_1 = "host_1"; String rack_1 = "rack_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); String host_2 = "host_2"; String rack_2 = "rack_2"; - SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -1284,7 +1284,7 @@ public class TestLeafQueue { assertEquals(1, app_0.getTotalRequiredResources(priority)); String host_3 = "host_3"; // on rack_1 - SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); + FiCaSchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB); assignment = a.assignContainers(clusterResource, node_3); verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), @@ -1305,23 +1305,23 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks String host_0 = "host_0"; String rack_0 = "rack_0"; - SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); String host_1 = "host_1"; String rack_1 = "rack_1"; - SchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); String host_2 = "host_2"; String rack_2 = "rack_2"; - SchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); @@ -1435,22 +1435,22 @@ public class TestLeafQueue { // Submit applications final ApplicationAttemptId appAttemptId_0 = TestUtils.getMockApplicationAttemptId(0, 0); - SchedulerApp app_0 = - spy(new SchedulerApp(appAttemptId_0, user_0, a, + FiCaSchedulerApp app_0 = + spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, mock(ActiveUsersManager.class), rmContext, null)); a.submitApplication(app_0, user_0, A); // Setup some nodes and racks String host_0_0 = "host_0_0"; String rack_0 = "rack_0"; - SchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB); String host_0_1 = "host_0_1"; - SchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); + FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB); String host_1_0 = "host_1_0"; String rack_1 = "rack_1"; - SchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); + FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB); final int numNodes = 3; Resource clusterResource = Resources.createResource(numNodes * (8*GB)); Modified: hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1362334&r1=1362333&r2=1362334&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original) +++ hadoop/common/branches/branch-2.1.0-alpha/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Tue Jul 17 01:51:07 2012 @@ -44,8 +44,8 @@ import org.apache.hadoop.yarn.conf.YarnC import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -99,22 +99,22 @@ public class TestParentQueue { LOG.info("Setup top-level queues a and b"); } - private SchedulerApp getMockApplication(int appId, String user) { - SchedulerApp application = mock(SchedulerApp.class); + private FiCaSchedulerApp getMockApplication(int appId, String user) { + FiCaSchedulerApp application = mock(FiCaSchedulerApp.class); doReturn(user).when(application).getUser(); doReturn(Resources.createResource(0)).when(application).getHeadroom(); return application; } private void stubQueueAllocation(final CSQueue queue, - final Resource clusterResource, final SchedulerNode node, + final Resource clusterResource, final FiCaSchedulerNode node, final int allocation) { stubQueueAllocation(queue, clusterResource, node, allocation, NodeType.NODE_LOCAL); } private void stubQueueAllocation(final CSQueue queue, - final Resource clusterResource, final SchedulerNode node, + final Resource clusterResource, final FiCaSchedulerNode node, final int allocation, final NodeType type) { // Simulate the queue allocation @@ -132,7 +132,7 @@ public class TestParentQueue { ((ParentQueue)queue).allocateResource(clusterResource, allocatedResource); } else { - SchedulerApp app1 = getMockApplication(0, ""); + FiCaSchedulerApp app1 = getMockApplication(0, ""); ((LeafQueue)queue).allocateResource(clusterResource, app1, allocatedResource); } @@ -198,9 +198,9 @@ public class TestParentQueue { final int memoryPerNode = 10; final int numNodes = 2; - SchedulerNode node_0 = + FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_1 = + FiCaSchedulerNode node_1 = TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); final Resource clusterResource = @@ -224,9 +224,9 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_1); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -237,9 +237,9 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_0); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -250,9 +250,9 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_0); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -263,9 +263,9 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_1); allocationOrder = inOrder(a, b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -346,11 +346,11 @@ public class TestParentQueue { final int memoryPerNode = 10; final int numNodes = 3; - SchedulerNode node_0 = + FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_1 = + FiCaSchedulerNode node_1 = TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_2 = + FiCaSchedulerNode node_2 = TestUtils.getMockNode("host_2", DEFAULT_RACK, 0, memoryPerNode*GB); final Resource clusterResource = @@ -401,11 +401,11 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_0); InOrder allocationOrder = inOrder(a, c, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -427,13 +427,13 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_2); allocationOrder = inOrder(a, a2, a1, b, c); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -457,9 +457,9 @@ public class TestParentQueue { final int memoryPerNode = 10; final int numNodes = 2; - SchedulerNode node_0 = + FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_1 = + FiCaSchedulerNode node_1 = TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); final Resource clusterResource = @@ -484,9 +484,9 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_1); InOrder allocationOrder = inOrder(a, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -498,9 +498,9 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_0); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -523,9 +523,9 @@ public class TestParentQueue { final int memoryPerNode = 10; final int numNodes = 2; - SchedulerNode node_0 = + FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - SchedulerNode node_1 = + FiCaSchedulerNode node_1 = TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); final Resource clusterResource = @@ -550,9 +550,9 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_1); InOrder allocationOrder = inOrder(b2, b3); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -564,9 +564,9 @@ public class TestParentQueue { root.assignContainers(clusterResource, node_0); allocationOrder = inOrder(b3, b2); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(SchedulerNode.class)); + any(FiCaSchedulerNode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource);