hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1378420 - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/
Date Wed, 29 Aug 2012 01:46:48 GMT
Author: sseth
Date: Wed Aug 29 01:46:47 2012
New Revision: 1378420

URL: http://svn.apache.org/viewvc?rev=1378420&view=rev
Log:
MAPREDUCE-4602. [MR-3902] Re-create ask list correctly in case of a temporary error in the
AM-RM allocate call (sseth)

Modified:
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
    hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1378420&r1=1378419&r2=1378420&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Wed Aug 29
01:46:47 2012
@@ -1,2 +1,4 @@
 Branch MR-3902
   MAPREDUCE-4581. TaskHeartbeatHandler should extend HeartbeatHandlerBase (Tsuyoshi OZAWA
via sseth)
+
+  MAPREDUCE-4602. Re-create ask list correctly in case of a temporary error in the AM-RM
allocate call (sseth)

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java?rev=1378420&r1=1378419&r2=1378420&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
Wed Aug 29 01:46:47 2012
@@ -132,7 +132,7 @@ public abstract class RMCommunicator ext
     return this.job.getProgress();
   }
 
-  // TODO XXX: Get rid of the dependencies on the ClientService.
+  // TODO (After 3902): Get rid of the dependencies on the ClientService.
   protected void register() {
     //Register
     InetSocketAddress serviceAddr = clientService.getBindAddress();

Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java?rev=1378420&r1=1378419&r2=1378420&view=diff
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
(original)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMContainerRequestor.java
Wed Aug 29 01:46:47 2012
@@ -30,6 +30,7 @@ import java.util.concurrent.locks.Reentr
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.app2.AppContext;
@@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.util.Recor
 /**
  * Keeps the data structures to send container requests to RM.
  */
+// TODO XXX: Eventually rename to RMCommunicator
 public class RMContainerRequestor extends RMCommunicator implements EventHandler<RMCommunicatorEvent>
{
   
   private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
@@ -74,10 +76,7 @@ public class RMContainerRequestor extend
   private int numContainersAllocated;
   private int numFinishedContainers; // Not very useful.
   
-  // TODO XXX: Lots of cleanup.
-  
-  // TODO XXX: Maintain some statistics on containers allocated, released etc.
-  
+
   //Key -> Priority
   //Value -> Map
   //  Key->ResourceName (e.g., hostname, rackname, *)
@@ -89,20 +88,19 @@ public class RMContainerRequestor extend
       new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
 
   private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
-  private Set<ContainerId> release = new TreeSet<ContainerId>();
+  private final Set<ContainerId> release = new TreeSet<ContainerId>();
   
   private Lock releaseLock = new ReentrantLock();
   private Lock askLock = new ReentrantLock();
   private final List<ContainerId> emptyReleaseList = new ArrayList<ContainerId>(0);
   private final List<ResourceRequest> emptyAskList = new ArrayList<ResourceRequest>();
   
-  // TODO XXX: May need to pass this to the NodeManager.
+  // TODO XXX: May need to pass this to the AMNodeMap
   private int clusterNmCount = 0;
   
   // TODO XXX Consider allowing sync comm between the requestor and allocator... 
   
-  // TODO XXX: Why does the RMRequestor require the ClientService ??
-  // TODO XXX: Get rid of the clock. Available from the context.
+  // TODO (after 3902): Why does the RMRequestor require the ClientService ?? (for the RPC
address. get rid of this.)
   public RMContainerRequestor(ClientService clientService, AppContext context) {
     super(clientService, context);
     this.clock = context.getClock();
@@ -138,12 +136,12 @@ public class RMContainerRequestor extend
   @Override
   public void init(Configuration conf) {
     super.init(conf);
-
     retrystartTime = clock.getTime();
-    retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
+    retryInterval = getConfig().getLong(
+        MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
         MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
   }
-  
+
   public void stop(Configuration conf) {
     LOG.info("NumAllocatedContainers: " + numContainersAllocated
            + "NumFinihsedContainers: " + numFinishedContainers
@@ -188,6 +186,11 @@ public class RMContainerRequestor extend
 
   private void addResourceRequest(Priority priority, String resourceName,
       Resource capability) {
+    addResourceRequest(priority, resourceName, capability, 1);
+  }
+  
+  private void addResourceRequest(Priority priority, String resourceName,
+      Resource capability, int increment) {
     Map<String, Map<Resource, ResourceRequest>> remoteRequests =
       this.remoteRequestsTable.get(priority);
     if (remoteRequests == null) {
@@ -211,7 +214,8 @@ public class RMContainerRequestor extend
       remoteRequest.setNumContainers(0);
       reqMap.put(capability, remoteRequest);
     }
-    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
+    remoteRequest.setNumContainers(remoteRequest.getNumContainers() + increment);
+    // 0 is a special case to re-add the request to the ask table.
 
     // Note this down for next interaction with ResourceManager
     int askSize = 0;
@@ -300,15 +304,18 @@ public class RMContainerRequestor extend
   @SuppressWarnings("unchecked")
   @Override
   protected void heartbeat() throws Exception {
-    int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory()
: 0;//first time it would be null
+    int headRoom = getAvailableResources() != null ? getAvailableResources()
+        .getMemory() : 0;// first time it would be null
     AMResponse response = errorCheckedMakeRemoteRequest();
     
-    int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory()
: 0;
+    int newHeadRoom = getAvailableResources() != null ? getAvailableResources()
+        .getMemory() : 0;
     List<Container> newContainers = response.getAllocatedContainers();    
     logNewContainers(newContainers);
     numContainersAllocated += newContainers.size();
     
-    List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
+    List<ContainerStatus> finishedContainers = response
+        .getCompletedContainersStatuses();
     logFinishedContainers(finishedContainers);
     numFinishedContainers += finishedContainers.size();
     
@@ -325,9 +332,9 @@ public class RMContainerRequestor extend
     if (newContainers.size() > 0) {
       newContainerIds = new ArrayList<ContainerId>(newContainers.size());
       for (Container container : newContainers) {
+        // TODO XXX Re-factor AMNodes and AMContainers.
         context.getAllContainers().addNewContainer(container);
-        newContainerIds.add(container.getId());
-        // TODO XXX: Maybe send this out as an Asynchrounous event ? | Scheduler events will
also have to be async in that case, and the order is critical. 
+        newContainerIds.add(container.getId()); 
         context.getAllNodes().nodeSeen(container.getNodeId());
       }
       eventHandler.handle(new AMSchedulerEventContainersAllocated(
@@ -359,7 +366,6 @@ public class RMContainerRequestor extend
                                          JobEventType.INTERNAL_ERROR));
         throw new YarnException("Could not contact RM after " +
                                 retryInterval + " milliseconds.");
-        // TODO XXX: Some changes to the exception handling -> e is ignored, YarnException
causes an exit. 
       }
       // Throw this up to the caller, which may decide to ignore it and
       // continue to attempt to contact the RM.
@@ -411,7 +417,7 @@ public class RMContainerRequestor extend
   public void handle(RMCommunicatorEvent rawEvent) {
     switch(rawEvent.getType()) {
     case CONTAINER_DEALLOCATE:
-      RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent)rawEvent;
+      RMCommunicatorContainerDeAllocateRequestEvent event = (RMCommunicatorContainerDeAllocateRequestEvent)
rawEvent;
       releaseLock.lock();
       try {
         numContainerReleaseRequests++;
@@ -472,15 +478,22 @@ public class RMContainerRequestor extend
     }
     askLock.lock();
     try {
-      ask.addAll(clonedAskList);
-      // TODO XXX: Asks cannot be populated like this. Would be better to
-      // iterate over the asks and get corresponding requests from the main
-      // table.
+      // Asks for a particular ressource could have changed (increased or
+      // decresed) during the failure. Re-pull the list from the
+      // remoteRequestTable. ask being a hashSet and using the same objects
+      // avoids duplicates.
+      rePopulateAskList(clonedAskList);
     } finally {
       askLock.unlock();
     }
   }
   
+  private void rePopulateAskList(List<ResourceRequest> clonedAskList) {
+    for (ResourceRequest rr : clonedAskList) {
+      addResourceRequest(rr.getPriority(), rr.getHostName(),
+          rr.getCapability(), 0);
+    }
+  }
 
   private void logNewContainers(List<Container> newContainers) {
     if (newContainers.size() > 0) {
@@ -508,4 +521,19 @@ public class RMContainerRequestor extend
       }
     }
   }
+
+  @Private
+  Map<Priority, Map<String, Map<Resource, ResourceRequest>>> getRemoteRequestTable()
{
+    return remoteRequestsTable;
+  }
+
+  @Private
+  Set<ResourceRequest> getAskSet() {
+    return ask;
+  }
+
+  @Private
+  Set<ContainerId> getReleaseSet() {
+    return release;
+  }
 }



Mime
View raw message