hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1133421 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apa...
Date Wed, 08 Jun 2011 15:19:15 GMT
Author: acmurthy
Date: Wed Jun  8 15:19:15 2011
New Revision: 1133421

URL: http://svn.apache.org/viewvc?rev=1133421&view=rev
Log:
Fixed deadlock during expiring NMs.

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1133421&r1=1133420&r2=1133421&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Wed Jun  8 15:19:15 2011
@@ -5,6 +5,8 @@ Trunk (unreleased changes)
 
     MAPREDUCE-279
 
+    Fixed deadlock during expiring NMs. (acmurthy)
+
     Re-enabling Uber-AM feature. (vinodkv)
  
     Fully resolve paths when launching containers. (Siddharth Seth)

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1133421&r1=1133420&r2=1133421&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
Wed Jun  8 15:19:15 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.net.NetworkTopo
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceListener;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.AbstractService;
 
@@ -118,6 +120,7 @@ NodeTracker, ClusterTracker {
   }
 
   @Override
+  @Lock(Lock.NoLock.class)
   public void init(Configuration conf) {
     super.init(conf);
     this.nmExpiryInterval =  conf.getLong(RMConfig.NM_EXPIRY_INTERVAL, 
@@ -168,11 +171,13 @@ NodeTracker, ClusterTracker {
   }
   
   @Override
+  @Lock(Lock.NoLock.class)
   public void addListener(ResourceListener listener) {
     this.resourceListener = listener;
   }
 
   @Override
+  @Lock(Lock.NoLock.class)
   public void start() {
     this.nmLivelinessMonitor.start();
     LOG.info("Expiry interval of NodeManagers set to " + nmExpiryInterval);
@@ -184,46 +189,52 @@ NodeTracker, ClusterTracker {
    * @param hostName the hostname of this node.
    * @return the resolved {@link Node} for this nodemanager.
    */
+  @Lock(Lock.NoLock.class)
   public static Node resolve(String hostName) {
     return new NodeBase(hostName, NetworkTopology.DEFAULT_RACK);
   }
   
+  @Lock(Lock.NoLock.class)
   protected NodeInfoTracker getAndAddNodeInfoTracker(NodeId nodeId, 
       String hostName, int cmPort, int httpPort,
       Node node, Resource capability) {
     NodeInfoTracker nTracker = null;
+    NodeManager nodeManager =
+      new NodeManagerImpl(nodeId, hostName, cmPort, httpPort, 
+          node, capability);
+
+    // Inform listeners and nodeStore
+    addNode(nodeManager);
     
+    // Record the new node
     synchronized(nodeManagers) {
-      if (!nodeManagers.containsKey(nodeId)) {
-        LOG.info("DEBUG -- Adding  " + hostName);
-        NodeManager nodeManager =
-          new NodeManagerImpl(nodeId, hostName, cmPort, httpPort, 
-              node, capability);
-        nodes.put(nodeManager.getNodeAddress(), nodeId);
-        addNode(nodeManager);
-        HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
-        response.setResponseId(0);
-        nTracker = new NodeInfoTracker(nodeManager, response);
-        nodeManagers.put(nodeId, nTracker);
-      } else {
-        nTracker = nodeManagers.get(nodeId);
-      }
+      LOG.info("DEBUG -- Adding  " + hostName);
+      nodes.put(nodeManager.getNodeAddress(), nodeId);
+      HeartbeatResponse response = 
+        recordFactory.newRecordInstance(HeartbeatResponse.class);
+      response.setResponseId(0);
+      nTracker = new NodeInfoTracker(nodeManager, response);
+      nodeManagers.put(nodeId, nTracker);
     }
+    
     return nTracker;
   }
 
+  @Lock(Lock.NoLock.class)
   private void addNode(NodeManager node) {
-    /* Inform the listeners */
+    // Inform the listeners
     resourceListener.addNode(node);
 
+    // Inform the node store
     try {
       nodeStore.storeNode(node);
     } catch (IOException ioe) {
       LOG.warn("Failed to store node " + node.getNodeAddress() + " in nodestore");
     }
-
   }
+  
   @Override
+  @Lock(Lock.NoLock.class)
   public RegistrationResponse registerNodeManager(
       String host, int cmPort, int httpPort, Resource capability) 
   throws IOException {
@@ -261,6 +272,7 @@ NodeTracker, ClusterTracker {
    * @param nodeManager the {@link NodeInfo} to update.
    * @param containers the containers from the status of the node manager.
    */
+  @Lock(Lock.NoLock.class)
   protected void updateListener(NodeInfo nodeManager, Map<String, List<Container>>
     containers) {
   /* inform any listeners of node heartbeats */
@@ -280,6 +292,7 @@ NodeTracker, ClusterTracker {
     return nodeManager.statusUpdate(containers);
   }
   
+  @Lock(Lock.NoLock.class)
   private boolean isValidNode(String hostName) {
     synchronized (hostsReader) {
       Set<String> hostsList = hostsReader.getHosts();
@@ -290,6 +303,7 @@ NodeTracker, ClusterTracker {
   }
   
   @Override
+  @Lock(Lock.NoLock.class)
   public HeartbeatResponse nodeHeartbeat(org.apache.hadoop.yarn.server.api.records.NodeStatus
remoteNodeStatus) 
   throws IOException {
     /**
@@ -306,10 +320,7 @@ NodeTracker, ClusterTracker {
     NodeId nodeId = remoteNodeStatus.getNodeId();
     
     // 1. Check if it's a registered node
-    NodeInfoTracker nTracker = null;
-    synchronized(nodeManagers) {
-      nTracker = nodeManagers.get(nodeId);
-    }
+    NodeInfoTracker nTracker = getNodeInfoTracker(nodeId);
     if (nTracker == null) {
       /* node does not exist */
       LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
@@ -369,13 +380,6 @@ NodeTracker, ClusterTracker {
       }
     }
 
-    /** TODO This should be 3 step process.
-     * nodemanager.statusupdate
-     * listener.update()
-     * nodemanager.getNodeResponse()
-     * This will allow flexibility in updates/scheduling/premption
-     */
-
     // Heartbeat response
     HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
     response.setResponseId(nTracker.getLastHeartBeatResponse().getResponseId() + 1);
@@ -400,31 +404,37 @@ NodeTracker, ClusterTracker {
   }
 
   @Override
+  @Lock(Lock.NoLock.class)
   public void unregisterNodeManager(NodeId nodeId) {
+    NodeManager node = null;  
     synchronized (nodeManagers) {
-      NodeManager node = getNodeManager(nodeId);
+      node = getNodeManager(nodeId);
       if (node != null) {
-        removeNode(node);
         nodeManagers.remove(nodeId);
         nodes.remove(node.getNodeAddress());
       } else {
         LOG.warn("Unknown node " + nodeId + " unregistered");
       }
     }
+    
+    // Inform the listeners and nodeStore
+    if (node != null) {
+      removeNode(node);
+    }
   }
 
+  @Lock(Lock.NoLock.class)
   private void removeNode(NodeManager node) {
-    synchronized (nodeManagers) {
-      resourceListener.removeNode(node);
-      try {
-        nodeStore.removeNode(node);
-      } catch (IOException ioe) {
-        LOG.warn("Failed to remove node " + node.getNodeAddress() + 
-            " from nodeStore", ioe);
-      }
+    resourceListener.removeNode(node);
+    try {
+      nodeStore.removeNode(node);
+    } catch (IOException ioe) {
+      LOG.warn("Failed to remove node " + node.getNodeAddress() + 
+          " from nodeStore", ioe);
     }
   }
   
+  @Lock(Lock.NoLock.class)
   private  NodeId getNodeId(String node) {
     NodeId nodeId;
     nodeId = nodes.get(node);
@@ -436,6 +446,7 @@ NodeTracker, ClusterTracker {
   }
 
   @Override
+  @Lock(RMResourceTrackerImpl.class)
   public synchronized YarnClusterMetrics getClusterMetrics() {
     YarnClusterMetrics ymetrics = recordFactory.newRecordInstance(YarnClusterMetrics.class);
     ymetrics.setNumNodeManagers(nodeManagers.size());
@@ -443,6 +454,7 @@ NodeTracker, ClusterTracker {
   }
 
   @Override
+  @Lock(Lock.NoLock.class)
   public void stop() {
     this.nmLivelinessMonitor.interrupt();
     this.nmLivelinessMonitor.shutdown();
@@ -456,6 +468,7 @@ NodeTracker, ClusterTracker {
   }
 
   @Override
+  @Lock(Lock.NoLock.class)
   public List<NodeInfo> getAllNodeInfo() {
     List<NodeInfo> infoList = new ArrayList<NodeInfo>();
     synchronized (nodeManagers) {
@@ -466,12 +479,14 @@ NodeTracker, ClusterTracker {
     return infoList;
   }
 
+  @Lock(Lock.NoLock.class)
   protected void addForTracking(NodeHeartbeatStatus nmStatus) {
     synchronized(nmExpiryQueue) {
       nmExpiryQueue.add(nmStatus);
     }
   }
 
+  @Lock(Lock.NoLock.class)
   protected void expireNMs(List<NodeId> nodes) {
     for (NodeId id: nodes) {
       unregisterNodeManager(id);
@@ -516,10 +531,7 @@ NodeTracker, ClusterTracker {
               ((now - leastRecent.getLastSeen()) > 
               nmExpiryInterval)) {
             nmExpiryQueue.remove(leastRecent);
-            NodeInfoTracker info;
-            synchronized(nodeManagers) {
-              info = nodeManagers.get(leastRecent.getNodeId());
-            }
+            NodeInfoTracker info = getNodeInfoTracker(leastRecent.getNodeId());
             if (info == null) {
               continue;
             }
@@ -581,6 +593,14 @@ NodeTracker, ClusterTracker {
     return nodeManager;
   }
 
+  private NodeInfoTracker getNodeInfoTracker(NodeId nodeId) {
+    NodeInfoTracker node = null;
+    synchronized (nodeManagers) {
+      node = nodeManagers.get(nodeId);
+    }
+    return node;
+  }
+  
   private NodeManager getNodeManagerForContainer(Container container) {
     NodeManager node;
     synchronized (nodeManagers) {
@@ -593,6 +613,7 @@ NodeTracker, ClusterTracker {
   }
   
   @Override
+  @Lock({YarnScheduler.class})
   public  boolean releaseContainer(Container container) {
     NodeManager node = getNodeManagerForContainer(container);
     return ((node != null) && node.releaseContainer(container));

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1133421&r1=1133420&r2=1133421&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Wed Jun  8 15:19:15 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.Lock;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationMaster;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -177,12 +178,14 @@ implements ResourceScheduler, CapacitySc
   public static final String ROOT_QUEUE = 
     CapacitySchedulerConfiguration.PREFIX + ROOT;
 
+  @Lock(CapacityScheduler.class)
   private void initializeQueues(CapacitySchedulerConfiguration conf) {
     root = parseQueue(conf, null, ROOT, queues, queues);
     LOG.info("Initialized root queue " + root);
   }
 
-  private synchronized void reinitializeQueues(CapacitySchedulerConfiguration conf) 
+  @Lock(CapacityScheduler.class)
+  private void reinitializeQueues(CapacitySchedulerConfiguration conf) 
   throws IOException {
     // Parse new queues
     Map<String, Queue> newQueues = new HashMap<String, Queue>();
@@ -203,6 +206,7 @@ implements ResourceScheduler, CapacitySc
    * @param queues existing queues
    * @param newQueues new queues
    */
+  @Lock(CapacityScheduler.class)
   private void validateExistingQueues(
       Map<String, Queue> queues, Map<String, Queue> newQueues) 
   throws IOException {
@@ -219,6 +223,7 @@ implements ResourceScheduler, CapacitySc
    * @param queues
    * @param newQueues
    */
+  @Lock(CapacityScheduler.class)
   private void addNewQueues(
       Map<String, Queue> queues, Map<String, Queue> newQueues) 
   {
@@ -231,6 +236,7 @@ implements ResourceScheduler, CapacitySc
     }
   }
   
+  @Lock(CapacityScheduler.class)
   private Queue parseQueue(CapacitySchedulerConfiguration conf, 
       Queue parent, String queueName, Map<String, Queue> queues,
       Map<String, Queue> oldQueues) {
@@ -330,16 +336,21 @@ implements ResourceScheduler, CapacitySc
      * structures if the finishApplication flag is set.
      */
     if (finishApplication) {
+      // Inform the queue
       Queue queue = queues.get(application.getQueue().getQueueName());
       queue.finishApplication(application, queue.getQueueName());
-      finishedApplication(applicationId, 
+      
+      // Inform the resource-tracker
+      clusterTracker.finishedApplication(applicationId, 
           application.getAllNodesForApplication());
+      
       // Remove from our data-structure
       applications.remove(applicationId);
     }
   }
 
   @Override
+  @Lock(Lock.NoLock.class)
   public Allocation allocate(ApplicationId applicationId,
       List<ResourceRequest> ask, List<Container> release)
       throws IOException {
@@ -386,6 +397,7 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Override
+  @Lock(Lock.NoLock.class)
   public QueueInfo getQueueInfo(String queueName, 
       boolean includeApplications, boolean includeChildQueues, boolean recursive) 
   throws IOException {
@@ -402,6 +414,7 @@ implements ResourceScheduler, CapacitySc
   }
 
   @Override
+  @Lock(Lock.NoLock.class)
   public List<QueueUserACLInfo> getQueueUserAclInfo() {
     UserGroupInformation user = null;
     try {
@@ -414,12 +427,14 @@ implements ResourceScheduler, CapacitySc
     return root.getQueueUserAclInfo(user);
   }
 
+  @Lock(Lock.NoLock.class)
   private void normalizeRequests(List<ResourceRequest> asks) {
     for (ResourceRequest ask : asks) {
       normalizeRequest(ask);
     }
   }
 
+  @Lock(Lock.NoLock.class)
   private void normalizeRequest(ResourceRequest ask) {
     int minMemory = minimumAllocation.getMemory();
     int memory = Math.max(ask.getCapability().getMemory(), minMemory);
@@ -427,7 +442,9 @@ implements ResourceScheduler, CapacitySc
         minMemory * ((memory/minMemory) + (memory%minMemory > 0 ? 1 : 0)));
   }
 
-  private List<Container> getCompletedContainers(Map<String, List<Container>>
allContainers) {
+  @Lock(CapacityScheduler.class)
+  private List<Container> getCompletedContainers(
+      Map<String, List<Container>> allContainers) {
     if (allContainers == null) {
       return new ArrayList<Container>();
     }
@@ -483,7 +500,8 @@ implements ResourceScheduler, CapacitySc
 
   }
 
-  private synchronized void killRunningContainers(List<Container> containers) {
+  @Lock(CapacityScheduler.class)
+  private void killRunningContainers(List<Container> containers) {
     for (Container container : containers) {
       container.setState(ContainerState.COMPLETE);
       LOG.info("Killing running container " + container.getId());
@@ -492,7 +510,8 @@ implements ResourceScheduler, CapacitySc
     }
   }
   
-  private synchronized void processCompletedContainers(
+  @Lock(Lock.NoLock.class)
+  private void processCompletedContainers(
       List<Container> completedContainers) {
     for (Container container: completedContainers) {
       Application application = getApplication(container.getId().getAppId());
@@ -509,6 +528,7 @@ implements ResourceScheduler, CapacitySc
     }
   }
 
+  @Lock(Lock.NoLock.class)
   private synchronized void processReleasedContainers(Application application,
       List<Container> releasedContainers) {
     // Inform the application
@@ -528,7 +548,8 @@ implements ResourceScheduler, CapacitySc
     processCompletedContainers(unusedContainers);
   }
 
-  private synchronized void releaseReservedContainers(Application application) {
+  @Lock(CapacityScheduler.class)
+  private void releaseReservedContainers(Application application) {
     LOG.info("Releasing reservations for completed application: " + 
         application.getApplicationId());
     Queue queue = queues.get(application.getQueue().getQueueName());
@@ -548,6 +569,7 @@ implements ResourceScheduler, CapacitySc
     }
   }
   
+  @Lock(Lock.NoLock.class)
   private Application getApplication(ApplicationId applicationId) {
     return applications.get(applicationId);
   }
@@ -617,7 +639,8 @@ implements ResourceScheduler, CapacitySc
         " clusterResource: " + clusterResource);
   }
   
-  public synchronized boolean releaseContainer(ApplicationId applicationId, 
+  @Lock(CapacityScheduler.class)
+  private boolean releaseContainer(ApplicationId applicationId, 
       Container container) {
     // Reap containers
     LOG.info("Application " + applicationId + " released container " + container);
@@ -625,12 +648,8 @@ implements ResourceScheduler, CapacitySc
   }
 
 
-  public synchronized void finishedApplication(ApplicationId applicationId,
-      List<NodeInfo> nodesToNotify) {
-    clusterTracker.finishedApplication(applicationId, nodesToNotify);
-  }
-
   @Override
+  @Lock(Lock.NoLock.class)
   public void recover(RMState state) throws Exception {
     applications.clear();
     for (Map.Entry<ApplicationId, ApplicationInfo> entry : state.getStoredApplications().entrySet())
{



Mime
View raw message