Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E27F96650 for ; Fri, 20 May 2011 08:10:00 +0000 (UTC) Received: (qmail 38665 invoked by uid 500); 20 May 2011 08:10:00 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 38610 invoked by uid 500); 20 May 2011 08:10:00 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 38602 invoked by uid 99); 20 May 2011 08:09:59 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 May 2011 08:09:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 May 2011 08:09:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 713452388A39; Fri, 20 May 2011 08:09:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1125273 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ ... Date: Fri, 20 May 2011 08:09:31 -0000 To: mapreduce-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110520080931.713452388A39@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sharad Date: Fri May 20 08:09:30 2011 New Revision: 1125273 URL: http://svn.apache.org/viewvc?rev=1125273&view=rev Log: Refactored RMContainerAllocator to release unused containers. Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1125273&r1=1125272&r2=1125273&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original) +++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri May 20 08:09:30 2011 @@ -3,6 +3,7 @@ Hadoop MapReduce Change Log Trunk (unreleased changes) MAPREDUCE-279 + Refactored RMContainerAllocator to release unused containers. (sharad) Fix null pointer exception in kill task attempt (mahadev) Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java?rev=1125273&r1=1125272&r2=1125273&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java Fri May 20 08:09:30 2011 @@ -63,4 +63,52 @@ public class ContainerLauncherEvent public String toString() { return super.toString() + " for taskAttempt " + taskAttemptID; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((containerID == null) ? 0 : containerID.hashCode()); + result = prime * result + + ((containerMgrAddress == null) ? 0 : containerMgrAddress.hashCode()); + result = prime * result + + ((containerToken == null) ? 0 : containerToken.hashCode()); + result = prime * result + + ((taskAttemptID == null) ? 0 : taskAttemptID.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + ContainerLauncherEvent other = (ContainerLauncherEvent) obj; + if (containerID == null) { + if (other.containerID != null) + return false; + } else if (!containerID.equals(other.containerID)) + return false; + if (containerMgrAddress == null) { + if (other.containerMgrAddress != null) + return false; + } else if (!containerMgrAddress.equals(other.containerMgrAddress)) + return false; + if (containerToken == null) { + if (other.containerToken != null) + return false; + } else if (!containerToken.equals(other.containerToken)) + return false; + if (taskAttemptID == null) { + if (other.taskAttemptID != null) + return false; + } else if (!taskAttemptID.equals(other.taskAttemptID)) + return false; + return true; + } + } Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1125273&r1=1125272&r2=1125273&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Fri May 20 08:09:30 2011 @@ -36,6 +36,8 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.MRAppMasterConstants; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.UserGroupInformation; @@ -208,7 +210,10 @@ public class ContainerLauncherImpl exten // and not yet processed if (eventQueue.contains(event)) { eventQueue.remove(event); // TODO: Any synchro needed? - // k: raise any event? + //deallocate the container + context.getEventHandler().handle( + new ContainerAllocatorEvent(event.getTaskAttemptID(), + ContainerAllocator.EventType.CONTAINER_DEALLOCATE)); } else { try { ContainerManager proxy = Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java?rev=1125273&r1=1125272&r2=1125273&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ContainerRequestEvent.java Fri May 20 08:09:30 2011 @@ -45,7 +45,7 @@ public class ContainerRequestEvent exten ContainerRequestEvent(TaskAttemptId attemptID, Resource capability, int priority) { - this(attemptID, capability, priority, null, null); + this(attemptID, capability, priority, new String[0], new String[0]); this.earlierAttemptFailed = true; } Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1125273&r1=1125272&r2=1125273&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri May 20 08:09:30 2011 @@ -19,225 +19,107 @@ package org.apache.hadoop.mapreduce.v2.app.rm; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.AppContext; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.records.AMResponse; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationMaster; -import org.apache.hadoop.yarn.api.records.ApplicationState; -import org.apache.hadoop.yarn.api.records.ApplicationStatus; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; /** * Allocates the container from the ResourceManager scheduler. */ -public class RMContainerAllocator extends RMCommunicator +public class RMContainerAllocator extends RMContainerRequestor implements ContainerAllocator { private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class); - private static final String ANY = "*"; - private int lastResponseID; - private final RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); - - //mapping for assigned containers - private final Map assignedMap = - new HashMap(); - - private final Map>> localRequestsQueue = - new TreeMap>>(); - - //Key -> Priority - //Value -> Map - //Key->ResourceName (e.g., hostname, rackname, *) - //Value->Map - //Key->Resource Capability - //Value->ResourceReqeust - private final Map>> - remoteRequestsTable = - new TreeMap>>(); - - private final Set ask = new TreeSet(); - private final Set release = new TreeSet(); + //holds information about the assigned containers to task attempts + private final AssignedRequests assignedRequests = new AssignedRequests(); + + //holds pending requests to be fulfilled by RM + private final PendingRequests pendingRequests = new PendingRequests(); + + private int containersAllocated = 0; + private int mapsAssigned = 0; + private int reducesAssigned = 0; + private int containersReleased = 0; + private int hostLocalAssigned = 0; + private int rackLocalAssigned = 0; public RMContainerAllocator(ClientService clientService, AppContext context) { super(clientService, context); } - // TODO: Need finer synchronization. @Override protected synchronized void heartbeat() throws Exception { - assign(getResources()); + List allocatedContainers = getResources(); + if (allocatedContainers.size() > 0) { + LOG.info("Before Assign: " + getStat()); + pendingRequests.assign(allocatedContainers); + LOG.info("After Assign: " + getStat()); + } + } + + @Override + public void stop() { + super.stop(); + LOG.info("Final Stats: " + getStat()); } @Override public synchronized void handle(ContainerAllocatorEvent event) { LOG.info("Processing the event " + event.toString()); - //TODO: can be replaced by switch instead of if-else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { - requestContainer((ContainerRequestEvent) event); + pendingRequests.add((ContainerRequestEvent) event); } else if ( event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) { - //TODO: handle deallocation - } - } - - protected synchronized void requestContainer(ContainerRequestEvent event) { - //add to the localRequestsQueue - //localRequests Queue is hashed by Resource and Priority for easy lookups - Map> eventMap = - this.localRequestsQueue.get(event.getPriority()); - if (eventMap == null) { - eventMap = new HashMap>(); - this.localRequestsQueue.put(event.getPriority(), eventMap); - } - - LinkedList eventList = - eventMap.get(event.getCapability()); - if (eventList == null) { - eventList = new LinkedList(); - eventMap.put(event.getCapability(), eventList); - } - eventList.add(event); - - if (event.getEarlierAttemptFailed()) { - addResourceRequest(event.getPriority(), ANY, event.getCapability()); - } else { - - // Create resource requests - for (String host : event.getHosts()) { - // Data-local - addResourceRequest(event.getPriority(), host, event.getCapability()); + TaskAttemptId aId = event.getAttemptID(); + + boolean removed = pendingRequests.remove(aId); + if (!removed) { + Container container = assignedRequests.get(aId); + if (container != null) { + removed = true; + assignedRequests.remove(aId); + containersReleased++; + release(container); + } } - - // Nothing Rack-local for now - for (String rack : event.getRacks()) { - addResourceRequest(event.getPriority(), rack, event.getCapability()); + if (!removed) { + LOG.error("Could not deallocate container for task attemptId " + + aId); } - - // Off-switch - addResourceRequest(event.getPriority(), ANY, event.getCapability()); } } - private void addResourceRequest(Priority priority, String resourceName, - Resource capability) { - Map> remoteRequests = - this.remoteRequestsTable.get(priority); - if (remoteRequests == null) { - remoteRequests = new HashMap>(); - this.remoteRequestsTable.put(priority, remoteRequests); - LOG.info("Added priority=" + priority); - } - Map reqMap = remoteRequests.get(resourceName); - if (reqMap == null) { - reqMap = new HashMap(); - remoteRequests.put(resourceName, reqMap); - } - ResourceRequest remoteRequest = reqMap.get(capability); - if (remoteRequest == null) { - remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class); - remoteRequest.setPriority(priority); - remoteRequest.setHostName(resourceName); - remoteRequest.setCapability(capability); - remoteRequest.setNumContainers(0); - reqMap.put(capability, remoteRequest); - } - remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); - - // Note this down for next interaction with ResourceManager - ask.add(remoteRequest); - LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId() - + " priority=" + priority.getPriority() + " resourceName=" + resourceName - + " numContainers=" + remoteRequest.getNumContainers() + " #asks=" - + ask.size()); + private String getStat() { + return "PendingMaps:" + pendingRequests.maps.size() + + " PendingReduces:" + pendingRequests.reduces.size() + + " containersAllocated:" + containersAllocated + + " mapsAssigned:" + mapsAssigned + + " reducesAssigned:" + reducesAssigned + + " containersReleased:" + containersReleased + + " hostLocalAssigned:" + hostLocalAssigned + + " rackLocalAssigned:" + rackLocalAssigned; } - - private void decResourceRequest(Priority priority, String resourceName, - Resource capability) { - Map> remoteRequests = - this.remoteRequestsTable.get(priority); - Map reqMap = remoteRequests.get(resourceName); - ResourceRequest remoteRequest = reqMap.get(capability); - - LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId() - + " priority=" + priority.getPriority() + " resourceName=" + resourceName - + " numContainers=" + remoteRequest.getNumContainers() + " #asks=" - + ask.size()); - - remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1); - if (remoteRequest.getNumContainers() == 0) { - reqMap.remove(capability); - if (reqMap.size() == 0) { - remoteRequests.remove(resourceName); - } - if (remoteRequests.size() == 0) { - remoteRequestsTable.remove(priority); - } - //remove from ask if it may have - ask.remove(remoteRequest); - } else { - ask.add(remoteRequest);//this will override the request if ask doesn't - //already have it. - } - - LOG.info("AFTER decResourceRequest:" + " applicationId=" - + applicationId.getId() + " priority=" + priority.getPriority() - + " resourceName=" + resourceName + " numContainers=" - + remoteRequest.getNumContainers() + " #asks=" + ask.size()); - } - + private List getResources() throws Exception { - ApplicationStatus status = - recordFactory.newRecordInstance(ApplicationStatus.class); - status.setApplicationId(applicationId); - status.setResponseId(lastResponseID); - - AllocateRequest allocateRequest = - recordFactory.newRecordInstance(AllocateRequest.class); - allocateRequest.setApplicationStatus(status); - allocateRequest.addAllAsks(new ArrayList(ask)); - allocateRequest.addAllReleases(new ArrayList(release)); - AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); - AMResponse response = allocateResponse.getAMResponse(); - lastResponseID = response.getResponseId(); - List allContainers = response.getContainerList(); - ask.clear(); - release.clear(); - - LOG.info("getResources() for " + applicationId + ":" + - " ask=" + ask.size() + - " release= "+ release.size() + - " recieved=" + allContainers.size()); + List allContainers = makeRemoteRequest(); List allocatedContainers = new ArrayList(); for (Container cont : allContainers) { if (cont.getState() != ContainerState.COMPLETE) { @@ -245,11 +127,12 @@ public class RMContainerAllocator extend LOG.debug("Received Container :" + cont); } else { LOG.info("Received completed container " + cont); - TaskAttemptId attemptID = assignedMap.remove(cont.getId()); + TaskAttemptId attemptID = assignedRequests.get(cont.getId()); if (attemptID == null) { LOG.error("Container complete event for unknown container id " + cont.getId()); } else { + assignedRequests.remove(attemptID); //send the container completed event to Task attempt eventHandler.handle(new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_CONTAINER_COMPLETED)); @@ -260,90 +143,220 @@ public class RMContainerAllocator extend return allocatedContainers; } - private void assign(List allocatedContainers) { - // Schedule in priority order - for (Priority priority : localRequestsQueue.keySet()) { - LOG.info("Assigning for priority " + priority); - assign(priority, allocatedContainers); - if (allocatedContainers.isEmpty()) { - break; + private class PendingRequests { + + private Resource mapResourceReqt; + private Resource reduceResourceReqt; + + private final LinkedList earlierFailedMaps = + new LinkedList(); + private final LinkedList earlierFailedReduces = + new LinkedList(); + + private final Map> mapsHostMapping = + new HashMap>(); + private final Map> mapsRackMapping = + new HashMap>(); + private final Map maps = + new LinkedHashMap(); + + private final Map reduces = + new LinkedHashMap(); + + boolean remove(TaskAttemptId tId) { + ContainerRequestEvent req = maps.remove(tId); + if (req == null) { + req = reduces.remove(tId); + } + if (req == null) { + return false; + } else { + decContainerReq(req); + return true; } } - - if (!allocatedContainers.isEmpty()) { - //TODO - //after the assigment, still containers are left - //This can happen if container requests are cancelled by AM, currently - //not there. release the unassigned containers?? - - //LOG.info("Releasing container " + allocatedContainer); - //release.add(allocatedContainer); - } - } - - private void assign(Priority priority, List allocatedContainers) { - for (Iterator i=allocatedContainers.iterator(); i.hasNext();) { - Container allocatedContainer = i.next(); - String host = allocatedContainer.getContainerManagerAddress(); - Resource capability = allocatedContainer.getResource(); - - LinkedList requestList = - localRequestsQueue.get(priority).get(capability); - - if (requestList == null) { - LOG.info("No request match at priority " + priority); - return; + + void add(ContainerRequestEvent event) { + + if (event.getAttemptID().getTaskId().getTaskType() == TaskType.MAP) { + if (mapResourceReqt == null) { + mapResourceReqt = event.getCapability(); + } + maps.put(event.getAttemptID(), event); + + if (event.getEarlierAttemptFailed()) { + earlierFailedMaps.add(event.getAttemptID()); + } else { + for (String host : event.getHosts()) { + LinkedList list = mapsHostMapping.get(host); + if (list == null) { + list = new LinkedList(); + mapsHostMapping.put(host, list); + } + list.add(event.getAttemptID()); + LOG.info("Added attempt req to host " + host); + } + for (String rack: event.getRacks()) { + LinkedList list = mapsRackMapping.get(rack); + if (list == null) { + list = new LinkedList(); + mapsRackMapping.put(rack, list); + } + list.add(event.getAttemptID()); + LOG.info("Added attempt req to rack " + rack); + } + } + } else {//reduce + if (reduceResourceReqt == null) { + reduceResourceReqt = event.getCapability(); + } + + if (event.getEarlierAttemptFailed()) { + earlierFailedReduces.add(event.getAttemptID()); + } + reduces.put(event.getAttemptID(), event); } - - ContainerRequestEvent assigned = null; - //walk thru the requestList to see if in any host matches - Iterator it = requestList.iterator(); + + addContainerReq(event); + } + + private void assign(List allocatedContainers) { + Iterator it = allocatedContainers.iterator(); + LOG.info("Got allocated containers " + allocatedContainers.size()); + containersAllocated += allocatedContainers.size(); while (it.hasNext()) { - ContainerRequestEvent event = it.next(); - if (event.getEarlierAttemptFailed()) { - // we want to fail fast. Ignore locality for rescheduling - // failed attempts. - assigned = event; - it.remove(); - break; - } - if (Arrays.asList(event.getHosts()).contains(host)) { // TODO: Fix - assigned = event; - it.remove(); - // Update resource requests - for (String hostName : event.getHosts()) { - decResourceRequest(priority, hostName, capability); + Container allocated = it.next(); + ContainerRequestEvent assigned = null; + LOG.info("Assiging container " + allocated); + + //try to assign to earlierFailedMaps if present + while (assigned == null && earlierFailedMaps.size() > 0 && + allocated.getResource().getMemory() >= mapResourceReqt.getMemory()) { + TaskAttemptId tId = earlierFailedMaps.removeFirst(); + if (maps.containsKey(tId)) { + assigned = maps.remove(tId); + mapsAssigned++; + LOG.info("Assigned from earlierFailedMaps"); } - break; } - } - if (assigned == null) {//host didn't match - if (requestList.size() > 0) { - //choose the first one in queue - assigned = requestList.remove(); + + //try to assign to earlierFailedReduces if present + while (assigned == null && earlierFailedReduces.size() > 0 && + allocated.getResource().getMemory() >= reduceResourceReqt.getMemory()) { + TaskAttemptId tId = earlierFailedReduces.removeFirst(); + if (reduces.containsKey(tId)) { + assigned = reduces.remove(tId); + reducesAssigned++; + LOG.info("Assigned from earlierFailedReduces"); + } } - } + + //try to assign to maps if present + //first by host, then by rack, followed by * + while (assigned == null && maps.size() > 0 + && allocated.getResource().getMemory() >= mapResourceReqt.getMemory()) { + String host = allocated.getContainerManagerAddress(); + String[] hostport = host.split(":"); + if (hostport.length == 2) { + host = hostport[0]; + } + LinkedList list = mapsHostMapping.get(host); + while (list != null && list.size() > 0) { + LOG.info("Host matched to the request list " + host); + TaskAttemptId tId = list.removeFirst(); + if (maps.containsKey(tId)) { + assigned = maps.remove(tId); + mapsAssigned++; + hostLocalAssigned++; + LOG.info("Assigned based on host match " + host); + } + } + if (assigned == null) { + // TODO: get rack + String rack = ""; + list = mapsRackMapping.get(rack); + while (list != null && list.size() > 0) { + TaskAttemptId tId = list.removeFirst(); + if (maps.containsKey(tId)) { + assigned = maps.remove(tId); + mapsAssigned++; + rackLocalAssigned++; + LOG.info("Assigned based on rack match " + rack); + } + } + if (assigned == null && maps.size() > 0) { + TaskAttemptId tId = maps.keySet().iterator().next(); + assigned = maps.remove(tId); + mapsAssigned++; + LOG.info("Assigned based on * match"); + } + } + } + + //try to assign to reduces if present + if (assigned == null && reduces.size() > 0 + && allocated.getResource().getMemory() >= reduceResourceReqt.getMemory()) { + TaskAttemptId tId = reduces.keySet().iterator().next(); + assigned = reduces.remove(tId); + reducesAssigned++; + LOG.info("Assigned to reduce"); + } + + if (assigned != null) { + + // Update resource requests + decContainerReq(assigned); - if (assigned != null) { - i.remove(); // Remove from allocated Containers list also. + // send the container-assigned event to task attempt + eventHandler.handle(new TaskAttemptContainerAssignedEvent( + assigned.getAttemptID(), allocated.getId(), + allocated.getContainerManagerAddress(), + allocated.getNodeHttpAddress(), + allocated.getContainerToken())); + + assignedRequests.add(allocated, assigned.getAttemptID()); + + LOG.info("Assigned container (" + allocated + ") " + + " to task " + assigned.getAttemptID() + + " on node " + allocated.getContainerManagerAddress()); + } else { + //not assigned to any request, release the container + LOG.info("Releasing unassigned container " + allocated); + containersReleased++; + release(allocated); + } + } + } + } - // Update resource requests - decResourceRequest(priority, ANY, capability); + private static class AssignedRequests { + private final Map containerToAttemptMap = + new HashMap(); + private final Map attemptToContainerMap = + new HashMap(); + + void add(Container container, TaskAttemptId tId) { + LOG.info("Assigned container " + container.getContainerManagerAddress() + + " to " + tId); + containerToAttemptMap.put(container.getId(), tId); + attemptToContainerMap.put(tId, container); + } - // send the container-assigned event to task attempt - eventHandler.handle(new TaskAttemptContainerAssignedEvent( - assigned.getAttemptID(), allocatedContainer.getId(), - allocatedContainer.getContainerManagerAddress(), - allocatedContainer.getNodeHttpAddress(), - allocatedContainer.getContainerToken())); - - assignedMap.put(allocatedContainer.getId(), assigned.getAttemptID()); - - LOG.info("Assigned container (" + allocatedContainer + ") " + - " to task " + assigned.getAttemptID() + " at priority " + priority + - " on node " + allocatedContainer.getContainerManagerAddress()); + boolean remove(TaskAttemptId tId) { + Container container = attemptToContainerMap.remove(tId); + if (container != null) { + containerToAttemptMap.remove(container.getId()); + return true; } + return false; + } + + TaskAttemptId get(ContainerId cId) { + return containerToAttemptMap.get(cId); } - } + Container get(TaskAttemptId tId) { + return attemptToContainerMap.get(tId); + } + } } Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1125273&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri May 20 08:09:30 2011 @@ -0,0 +1,200 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.rm; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.client.ClientService; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AMResponse; +import org.apache.hadoop.yarn.api.records.ApplicationStatus; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; + +/** + * Keeps the data structures to send container requests to RM. + */ +public abstract class RMContainerRequestor extends RMCommunicator { + + private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class); + static final String ANY = "*"; + + private int lastResponseID; + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + //Key -> Priority + //Value -> Map + //Key->ResourceName (e.g., hostname, rackname, *) + //Value->Map + //Key->Resource Capability + //Value->ResourceReqeust + private final Map>> + remoteRequestsTable = + new TreeMap>>(); + + private final Set ask = new TreeSet(); + private final Set release = new TreeSet(); + + public RMContainerRequestor(ClientService clientService, AppContext context) { + super(clientService, context); + } + + protected abstract void heartbeat() throws Exception; + + protected List makeRemoteRequest() throws YarnRemoteException { + ApplicationStatus status = recordFactory + .newRecordInstance(ApplicationStatus.class); + status.setApplicationId(applicationId); + status.setResponseId(lastResponseID); + + AllocateRequest allocateRequest = recordFactory + .newRecordInstance(AllocateRequest.class); + allocateRequest.setApplicationStatus(status); + allocateRequest.addAllAsks(new ArrayList(ask)); + allocateRequest.addAllReleases(new ArrayList(release)); + AllocateResponse allocateResponse = scheduler.allocate(allocateRequest); + AMResponse response = allocateResponse.getAMResponse(); + lastResponseID = response.getResponseId(); + List allContainers = response.getContainerList(); + ask.clear(); + release.clear(); + + LOG.info("getResources() for " + applicationId + ":" + " ask=" + + ask.size() + " release= " + release.size() + " recieved=" + + allContainers.size()); + return allContainers; + } + + protected void addContainerReq(ContainerRequestEvent req) { + // Create resource requests + for (String host : req.getHosts()) { + // Data-local + addResourceRequest(req.getPriority(), host, req.getCapability()); + } + + // Nothing Rack-local for now + for (String rack : req.getRacks()) { + addResourceRequest(req.getPriority(), rack, req.getCapability()); + } + + // Off-switch + addResourceRequest(req.getPriority(), ANY, req.getCapability()); + } + + protected void decContainerReq(ContainerRequestEvent req) { + // Update resource requests + for (String hostName : req.getHosts()) { + decResourceRequest(req.getPriority(), hostName, req.getCapability()); + } + + for (String rack : req.getRacks()) { + decResourceRequest(req.getPriority(), rack, req.getCapability()); + } + + decResourceRequest(req.getPriority(), ANY, req.getCapability()); + } + + private void addResourceRequest(Priority priority, String resourceName, + Resource capability) { + Map> remoteRequests = + this.remoteRequestsTable.get(priority); + if (remoteRequests == null) { + remoteRequests = new HashMap>(); + this.remoteRequestsTable.put(priority, remoteRequests); + LOG.info("Added priority=" + priority); + } + Map reqMap = remoteRequests.get(resourceName); + if (reqMap == null) { + reqMap = new HashMap(); + remoteRequests.put(resourceName, reqMap); + } + ResourceRequest remoteRequest = reqMap.get(capability); + if (remoteRequest == null) { + remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class); + remoteRequest.setPriority(priority); + remoteRequest.setHostName(resourceName); + remoteRequest.setCapability(capability); + remoteRequest.setNumContainers(0); + reqMap.put(capability, remoteRequest); + } + remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); + + // Note this down for next interaction with ResourceManager + ask.add(remoteRequest); + LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId() + + " priority=" + priority.getPriority() + " resourceName=" + resourceName + + " numContainers=" + remoteRequest.getNumContainers() + " #asks=" + + ask.size()); + } + + private void decResourceRequest(Priority priority, String resourceName, + Resource capability) { + Map> remoteRequests = + this.remoteRequestsTable.get(priority); + Map reqMap = remoteRequests.get(resourceName); + ResourceRequest remoteRequest = reqMap.get(capability); + + LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId() + + " priority=" + priority.getPriority() + " resourceName=" + resourceName + + " numContainers=" + remoteRequest.getNumContainers() + " #asks=" + + ask.size()); + + remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1); + if (remoteRequest.getNumContainers() == 0) { + reqMap.remove(capability); + if (reqMap.size() == 0) { + remoteRequests.remove(resourceName); + } + if (remoteRequests.size() == 0) { + remoteRequestsTable.remove(priority); + } + //remove from ask if it may have + ask.remove(remoteRequest); + } else { + ask.add(remoteRequest);//this will override the request if ask doesn't + //already have it. + } + + LOG.info("AFTER decResourceRequest:" + " applicationId=" + + applicationId.getId() + " priority=" + priority.getPriority() + + " resourceName=" + resourceName + " numContainers=" + + remoteRequest.getNumContainers() + " #asks=" + ask.size()); + } + + protected void release(Container container) { + release.add(container); + } + +} Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1125273&r1=1125272&r2=1125273&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri May 20 08:09:30 2011 @@ -33,12 +33,15 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -175,7 +178,7 @@ public class TestRMContainerAllocator { allocator.sendRequest(event1); //send 1 more request with different priority - ContainerRequestEvent event2 = createReq(2, 2048, 2, new String[]{"h1"}); + ContainerRequestEvent event2 = createReq(2, 3000, 2, new String[]{"h1"}); allocator.sendRequest(event2); //send 1 more request with different priority @@ -256,8 +259,19 @@ public class TestRMContainerAllocator { private ContainerRequestEvent createReq( int attemptid, int memory, int priority, String[] hosts) { + ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); + appId.setClusterTimestamp(0); + appId.setId(0); + JobId jobId = recordFactory.newRecordInstance(JobId.class); + jobId.setAppId(appId); + jobId.setId(0); + TaskId taskId = recordFactory.newRecordInstance(TaskId.class); + taskId.setId(0); + taskId.setJobId(jobId); + taskId.setTaskType(TaskType.MAP); TaskAttemptId attemptId = recordFactory.newRecordInstance(TaskAttemptId.class); attemptId.setId(attemptid); + attemptId.setTaskId(taskId); Resource containerNeed = recordFactory.newRecordInstance(Resource.class); containerNeed.setMemory(memory); return new ContainerRequestEvent(attemptId, @@ -384,7 +398,8 @@ public class TestRMContainerAllocator { try { heartbeat(); } catch (Exception e) { - + LOG.error("error in heartbeat ", e); + throw new YarnException(e); } List result = new ArrayList(events);