hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1130172 [2/2] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resou...
Date Wed, 01 Jun 2011 14:53:16 GMT
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeInfo.java
Wed Jun  1 14:53:14 2011
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
 
-import java.util.Collection;
+
 import java.util.List;
 
 import org.apache.hadoop.net.Node;
@@ -41,46 +41,74 @@ public interface NodeInfo {
    * @return the node id of this node.
    */
   public NodeId getNodeID();
+  
+  /**
+   * the hostname of this node
+   * @return hostname of this node
+   */
+  public String getNodeHostName();
+  
+  /**
+   * the command port for this node
+   * @return command port for this node
+   */
+  public int getCommandPort();
+  
+  /**
+   * the http port for this node
+   * @return http port for this node
+   */
+  public int getHttpPort();
+
+
   /**
    * the ContainerManager address for this node.
    * @return the ContainerManager address for this node.
    */
   public String getNodeAddress();
+  
   /**
    * the http-Address for this node.
    * @return the http-url address for this node
    */
   public String getHttpAddress();
+  
   /**
    * the health-status for this node
    * @return the health-status for this node.
    */
   public NodeHealthStatus getNodeHealthStatus();
+  
   /**
    * the total available resource.
    * @return the total available resource.
    */
   public org.apache.hadoop.yarn.api.records.Resource getTotalCapability();
+  
   /**
    * The rack name for this node manager.
    * @return the rack name.
    */
   public String getRackName();
+  
   /**
    * the {@link Node} information for this node.
    * @return {@link Node} information for this node.
    */
   public Node getNode();
+  
   /**
    * the available resource for this node.
    * @return the available resource this node.
    */
   public org.apache.hadoop.yarn.api.records.Resource getAvailableResource();
+  
   /**
    * used resource on this node.
    * @return the used resource on this node.
    */
   public org.apache.hadoop.yarn.api.records.Resource getUsedResource();
+  
   /**
    * The current number of containers for this node
    * @return the number of containers

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeTracker.java?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeTracker.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/NodeTracker.java
Wed Jun  1 14:53:14 2011
@@ -36,4 +36,6 @@ public interface NodeTracker {
   throws IOException;
   
   public void unregisterNodeManager(NodeId nodeId) throws IOException;
+  
+  public void refreshNodes() throws IOException;
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/RMResourceTrackerImpl.java
Wed Jun  1 14:53:14 2011
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +38,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.util.HostsFileReader;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -76,6 +78,7 @@ NodeTracker, ClusterTracker {
   private final Map<NodeId, NodeInfoTracker> nodeManagers = 
     new ConcurrentHashMap<NodeId, NodeInfoTracker>();
   private final NMLivelinessMonitor nmLivelinessMonitor;
+  private HostsFileReader hostsReader;
 
   private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
 
@@ -104,7 +107,9 @@ NodeTracker, ClusterTracker {
   private long nmExpiryInterval;
   private final NodeStore nodeStore;
   
-  public RMResourceTrackerImpl(ContainerTokenSecretManager containerTokenSecretManager, RMContext
context) {
+  public RMResourceTrackerImpl(
+      ContainerTokenSecretManager containerTokenSecretManager, 
+      RMContext context) {
     super(RMResourceTrackerImpl.class.getName());
     reboot.setReboot(true);
     this.containerTokenSecretManager = containerTokenSecretManager;
@@ -114,14 +119,54 @@ NodeTracker, ClusterTracker {
 
   @Override
   public void init(Configuration conf) {
+    super.init(conf);
     this.nmExpiryInterval =  conf.getLong(RMConfig.NM_EXPIRY_INTERVAL, 
         RMConfig.DEFAULT_NM_EXPIRY_INTERVAL);
     this.nmLivelinessMonitor.setMonitoringInterval(conf.getLong(
         RMConfig.NMLIVELINESS_MONITORING_INTERVAL,
         RMConfig.DEFAULT_NMLIVELINESS_MONITORING_INTERVAL));
-    super.init(conf);
+    
+    // Read the hosts/exclude files to restrict access to the RM
+    try {
+      this.hostsReader = 
+        new HostsFileReader(
+            conf.get(RMConfig.RM_NODES_INCLUDE_FILE, 
+                RMConfig.DEFAULT_RM_NODES_INCLUDE_FILE),
+            conf.get(RMConfig.RM_NODES_EXCLUDE_FILE, 
+                RMConfig.DEFAULT_RM_NODES_EXCLUDE_FILE)
+                );
+      printConfiguredHosts();
+    } catch (IOException ioe) {
+      LOG.warn("Failed to init hostsReader, disabling", ioe);
+      try {
+        this.hostsReader = 
+          new HostsFileReader(RMConfig.DEFAULT_RM_NODES_INCLUDE_FILE, 
+              RMConfig.DEFAULT_RM_NODES_EXCLUDE_FILE);
+      } catch (IOException ioe2) {
+        // Should *never* happen
+        this.hostsReader = null;
+      }
+    }
   }
 
+  private void printConfiguredHosts() {
+    if (!LOG.isDebugEnabled()) {
+      return;
+    }
+    
+    Configuration conf = getConfig();
+    LOG.debug("hostsReader: in=" + conf.get(RMConfig.RM_NODES_INCLUDE_FILE, 
+        RMConfig.DEFAULT_RM_NODES_INCLUDE_FILE) + " out=" +
+        conf.get(RMConfig.RM_NODES_EXCLUDE_FILE, 
+            RMConfig.DEFAULT_RM_NODES_EXCLUDE_FILE));
+    for (String include : hostsReader.getHosts()) {
+      LOG.debug("include: " + include);
+    }
+    for (String exclude : hostsReader.getExcludedHosts()) {
+      LOG.debug("exclude: " + exclude);
+    }
+  }
+  
   @Override
   public void addListener(ResourceListener listener) {
     this.resourceListener = listener;
@@ -143,17 +188,17 @@ NodeTracker, ClusterTracker {
     return new NodeBase(hostName, NetworkTopology.DEFAULT_RACK);
   }
   
-  protected NodeInfoTracker getAndAddNodeInfoTracker(NodeId nodeId,
-      String hostString, String httpAddress, Node node, Resource capability) {
+  protected NodeInfoTracker getAndAddNodeInfoTracker(NodeId nodeId, 
+      String hostName, int cmPort, int httpPort,
+      Node node, Resource capability) {
     NodeInfoTracker nTracker = null;
     
     synchronized(nodeManagers) {
       if (!nodeManagers.containsKey(nodeId)) {
-        LOG.info("DEBUG -- Adding  " + hostString);
+        LOG.info("DEBUG -- Adding  " + hostName);
         NodeManager nodeManager =
-          new NodeManagerImpl(nodeId, hostString, httpAddress,
-              node,
-              capability);
+          new NodeManagerImpl(nodeId, hostName, cmPort, httpPort, 
+              node, capability);
         nodes.put(nodeManager.getNodeAddress(), nodeId);
         addNode(nodeManager);
         HeartbeatResponse response = recordFactory.newRecordInstance(HeartbeatResponse.class);
@@ -182,18 +227,23 @@ NodeTracker, ClusterTracker {
   public RegistrationResponse registerNodeManager(
       String host, int cmPort, int httpPort, Resource capability) 
   throws IOException {
-    String node = host + ":" + cmPort;
-    String httpAddress = host + ":" + httpPort;
+    // Check if this node is a 'valid' node
+    if (!isValidNode(host)) {
+      LOG.info("Disallowed NodeManager from  " + host);
+      throw new IOException("Disallowed NodeManager from  " + host); 
+    }
 
+    String node = host + ":" + cmPort;
     NodeId nodeId = getNodeId(node);
     
     NodeInfoTracker nTracker = null;
     nTracker = 
-      getAndAddNodeInfoTracker(nodeId, node, httpAddress, 
-          resolve(node), capability);
+      getAndAddNodeInfoTracker(nodeId, host, cmPort, httpPort, 
+          resolve(host), capability);
     addForTracking(nTracker.getlastHeartBeat());
-    LOG.info("NodeManager from node " + node + "(web-url: " + httpAddress
-        + ") registered with capability: " + capability.getMemory()
+    LOG.info("NodeManager from node " + host + 
+        "(cmPort: " + cmPort + " httpPort: " + httpPort + ") "
+        + "registered with capability: " + capability.getMemory()
         + ", assigned nodeId " + nodeId.getId());
 
     RegistrationResponse regResponse = recordFactory.newRecordInstance(
@@ -230,8 +280,13 @@ NodeTracker, ClusterTracker {
     return nodeManager.statusUpdate(containers);
   }
   
-  private boolean isValidNode(NodeId nodeId) {
-    return true;
+  private boolean isValidNode(String hostName) {
+    synchronized (hostsReader) {
+      Set<String> hostsList = hostsReader.getHosts();
+      Set<String> excludeList = hostsReader.getExcludedHosts();
+      return ((hostsList.isEmpty() || hostsList.contains(hostName)) && 
+          !excludeList.contains(hostName));
+    }
   }
   
   @Override
@@ -264,8 +319,9 @@ NodeTracker, ClusterTracker {
     nTracker.setLastHeartBeatTime();
     
     // 2. Check if it's a valid (i.e. not excluded) node
-    // TODO - Check for valid/invalid node from hosts list
-    if (!isValidNode(nodeId)) {
+    if (!isValidNode(nTracker.getNodeManager().getNodeHostName())) {
+      LOG.info("Disallowed NodeManager nodeId: " + nodeId +  
+          " hostname: " + nTracker.getNodeManager().getNodeAddress());
       unregisterNodeManager(remoteNodeStatus.getNodeId());
       throw new IOException("Disallowed NodeManager nodeId: " + 
           remoteNodeStatus.getNodeId());
@@ -346,10 +402,14 @@ NodeTracker, ClusterTracker {
   @Override
   public void unregisterNodeManager(NodeId nodeId) {
     synchronized (nodeManagers) {
-      NodeManager node = nodeManagers.get(nodeId).getNodeManager();
-      removeNode(node);
-      nodeManagers.remove(nodeId);
-      nodes.remove(node.getNodeAddress());
+      NodeManager node = getNodeManager(nodeId);
+      if (node != null) {
+        removeNode(node);
+        nodeManagers.remove(nodeId);
+        nodes.remove(node.getNodeAddress());
+      } else {
+        LOG.warn("Unknown node " + nodeId + " unregistered");
+      }
     }
   }
 
@@ -365,12 +425,6 @@ NodeTracker, ClusterTracker {
     }
   }
   
-  @Private
-  public synchronized NodeInfo getNodeManager(NodeId nodeId) {
-    NodeInfoTracker ntracker = nodeManagers.get(nodeId);
-    return (ntracker == null ? null: ntracker.getNodeManager());
-  }
-
   private  NodeId getNodeId(String node) {
     NodeId nodeId;
     nodeId = nodes.get(node);
@@ -503,40 +557,54 @@ NodeTracker, ClusterTracker {
   public void finishedApplication(ApplicationId applicationId,
       List<NodeInfo> nodesToNotify) {
     for (NodeInfo info: nodesToNotify) {
-      NodeManager node = null;
-      synchronized(nodeManagers) {
-        NodeInfoTracker nodeInfo = nodeManagers.get(info.getNodeID());
-        if (nodeInfo != null) {
-          node = nodeInfo.getNodeManager();
-        }
-      }
+      NodeManager node = getNodeManager(info.getNodeID());
       if (node != null) {
         node.finishedApplication(applicationId);
       }
     } 
   }
+  
+  @Private
+  public NodeManager getNodeManager(NodeId nodeId) {
+    if (nodeId == null) {
+      LOG.info("getNodeManager called with nodeId=null");
+      return null;
+    }
+    
+    NodeManager nodeManager = null;
+    synchronized (nodeManagers) {
+      NodeInfoTracker node = nodeManagers.get(nodeId);
+      if (node != null) {
+        nodeManager = node.getNodeManager();
+      }
+    }
+    return nodeManager;
+  }
 
   private NodeManager getNodeManagerForContainer(Container container) {
     NodeManager node;
     synchronized (nodeManagers) {
-      LOG.info("DEBUG -- Container manager address " + container.getContainerManagerAddress());
+      LOG.info("DEBUG -- Container manager address " + 
+          container.getContainerManagerAddress());
       NodeId nodeId = nodes.get(container.getContainerManagerAddress());
-      node = nodeManagers.get(nodeId).getNodeManager();
+      node = getNodeManager(nodeId);
     }
     return node;
   }
+  
   @Override
   public  boolean releaseContainer(Container container) {
     NodeManager node = getNodeManagerForContainer(container);
-    return node.releaseContainer(container);
+    return ((node != null) && node.releaseContainer(container));
   }
   
   @Override
   public void recover(RMState state) {
     List<NodeManager> nodeManagers = state.getStoredNodeManagers();
     for (NodeManager nm: nodeManagers) {
-      getAndAddNodeInfoTracker(nm.getNodeID(), nm.getNodeAddress(), 
-          nm.getHttpAddress(), nm.getNode(), nm.getTotalCapability());
+      getAndAddNodeInfoTracker(nm.getNodeID(), nm.getNodeHostName(),
+          nm.getCommandPort(), nm.getHttpPort(),
+          nm.getNode(), nm.getTotalCapability());
     }
     for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet())
{
       List<Container> containers = entry.getValue().getContainers();
@@ -550,4 +618,12 @@ NodeTracker, ClusterTracker {
     }
   }
 
+  @Override
+  public void refreshNodes() throws IOException {
+    synchronized (hostsReader) {
+      hostsReader.refresh();
+      printConfiguredHosts();
+    }
+  }
+
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
Wed Jun  1 14:53:14 2011
@@ -55,6 +55,9 @@ public class NodeManagerImpl implements 
   private static final Log LOG = LogFactory.getLog(NodeManager.class);
   private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   private final NodeId nodeId;
+  private final String hostName;
+  private final int commandPort;
+  private final int httpPort;
   private final String nodeAddress; // The containerManager address
   private final String httpAddress;
   private Resource totalCapability;
@@ -83,12 +86,16 @@ public class NodeManagerImpl implements 
   
   private volatile int numContainers;
   
-  public NodeManagerImpl(NodeId nodeId, String nodeAddress,
-      String httpAddress, Node node, Resource capability) {
+  public NodeManagerImpl(NodeId nodeId, String hostName, 
+      int cmPort, int httpPort,
+      Node node, Resource capability) {
     this.nodeId = nodeId;   
+    this.hostName = hostName;
+    this.commandPort = cmPort;
+    this.httpPort = httpPort;
     this.totalCapability = capability; 
-    this.nodeAddress = nodeAddress;
-    this.httpAddress = httpAddress;
+    this.nodeAddress = hostName + ":" + cmPort;
+    this.httpAddress = hostName + ":" + httpPort;
     Resources.addTo(availableResource, capability);
     this.node = node;
   }
@@ -101,6 +108,21 @@ public class NodeManagerImpl implements 
     return this;
   }
   
+  @Override
+  public String getNodeHostName() {
+    return hostName;
+  }
+
+  @Override
+  public int getCommandPort() {
+    return commandPort;
+  }
+
+  @Override
+  public int getHttpPort() {
+    return httpPort;
+  }
+
   /**
    * The Scheduler has allocated containers on this node to the 
    * given application.

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Wed Jun  1 14:53:14 2011
@@ -178,15 +178,6 @@ implements ResourceScheduler, CapacitySc
 
   private synchronized void reinitializeQueues(CapacitySchedulerConfiguration conf) 
   throws IOException {
-    // Check access to ensure the user can run rmadmin -refreshQueues
-    UserGroupInformation user = UserGroupInformation.getCurrentUser();
-    if (!root.hasAccess(QueueACL.ADMINISTER_QUEUES, user)) {
-      LOG.warn("User " + user.getShortUserName() + " doesn't have permission" +
-      		" to re-initialize queues!");
-      throw new AccessControlException("User " + user.getShortUserName() + 
-          " doesn't have permission to re-initialize queues!");
-    }
-    
     // Parse new queues
     Map<String, Queue> newQueues = new HashMap<String, Queue>();
     Queue newRoot = parseQueue(conf, null, ROOT, newQueues, queues);
@@ -599,6 +590,8 @@ implements ResourceScheduler, CapacitySc
 
   @Override
   public synchronized void removeNode(NodeInfo nodeInfo) {
+    LOG.info("Removing node " + nodeInfo.getNodeAddress());
+    
     Resources.subtractFrom(clusterResource, nodeInfo.getTotalCapability());
     --numNodeManagers;
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/tools/RMAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/tools/RMAdmin.java?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/tools/RMAdmin.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/tools/RMAdmin.java
Wed Jun  1 14:53:14 2011
@@ -19,7 +19,11 @@ import org.apache.hadoop.yarn.ipc.YarnRP
 import org.apache.hadoop.yarn.security.admin.AdminSecurityInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.RMConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.api.RMAdminProtocol;
+import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshNodesRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
 
 public class RMAdmin extends Configured implements Tool {
 
@@ -37,19 +41,43 @@ public class RMAdmin extends Configured 
   private static void printHelp(String cmd) {
     String summary = "rmadmin is the command to execute Map-Reduce administrative commands.\n"
+
     "The full syntax is: \n\n" +
-    "hadoop rmadmin [-refreshQueues] " +
-    "[-help [cmd]]\n";
+    "hadoop rmadmin" +
+      " [-refreshQueues]" +
+      " [-refreshNodes]" +
+      " [-refreshSuperUserGroupsConfiguration]" +
+      " [-refreshUserToGroupsMappings]" +
+      " [-refreshAdminAcls]" +
+      " [-help [cmd]]\n";
 
     String refreshQueues =
       "-refreshQueues: Reload the queues' acls, states and "
       + "scheduler specific properties.\n"
-      + "\t\tJobTracker will reload the mapred-queues configuration file.\n";
+      + "\t\tResourceManager will reload the mapred-queues configuration file.\n";
 
+    String refreshNodes = 
+      "-refreshNodes: Refresh the hosts information at the ResourceManager.\n";
+    
+    String refreshUserToGroupsMappings = 
+      "-refreshUserToGroupsMappings: Refresh user-to-groups mappings\n";
+    
+    String refreshSuperUserGroupsConfiguration = 
+      "-refreshSuperUserGroupsConfiguration: Refresh superuser proxy groups mappings\n";
+
+    String refreshAdminAcls =
+      "-refreshAdminAcls: Refresh acls for administration of ResourceManager\n";
     String help = "-help [cmd]: \tDisplays help for the given command or all commands if
none\n" +
     "\t\tis specified.\n";
 
     if ("refreshQueues".equals(cmd)) {
       System.out.println(refreshQueues);
+    }  else if ("refreshNodes".equals(cmd)) {
+      System.out.println(refreshNodes);
+    } else if ("refreshUserToGroupsMappings".equals(cmd)) {
+      System.out.println(refreshUserToGroupsMappings);
+    } else if ("refreshSuperUserGroupsConfiguration".equals(cmd)) {
+      System.out.println(refreshSuperUserGroupsConfiguration);
+    } else if ("refreshAdminAcls".equals(cmd)) {
+      System.out.println(refreshAdminAcls);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -68,9 +96,21 @@ public class RMAdmin extends Configured 
   private static void printUsage(String cmd) {
     if ("-refreshQueues".equals(cmd)) {
       System.err.println("Usage: java RMAdmin" + " [-refreshQueues]");
+    } else if ("-refreshNodes".equals(cmd)){
+      System.err.println("Usage: java RMAdmin" + " [-refreshNodes]");
+    } else if ("-refreshUserToGroupsMappings".equals(cmd)){
+      System.err.println("Usage: java RMAdmin" + " [-refreshUserToGroupsMappings]");
+    } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)){
+      System.err.println("Usage: java RMAdmin" + " [-refreshSuperUserGroupsConfiguration]");
+    } else if ("-refreshAdminAcls".equals(cmd)){
+      System.err.println("Usage: java RMAdmin" + " [-refreshAdminAcls]");
     } else {
       System.err.println("Usage: java RMAdmin");
       System.err.println("           [-refreshQueues]");
+      System.err.println("           [-refreshNodes]");
+      System.err.println("           [-refreshUserToGroupsMappings]");
+      System.err.println("           [-refreshSuperUserGroupsConfiguration]");
+      System.err.println("           [-refreshAdminAcls]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -82,7 +122,7 @@ public class RMAdmin extends Configured 
     return UserGroupInformation.getCurrentUser();
   }
 
-  private int refreshQueues() throws IOException {
+  private RMAdminProtocol createAdminProtocol() throws IOException {
     // Get the current configuration
     final YarnConfiguration conf = new YarnConfiguration(getConf());
 
@@ -106,14 +146,54 @@ public class RMAdmin extends Configured 
         }
       });
 
+    return adminProtocol;
+  }
+  
+  private int refreshQueues() throws IOException {
     // Refresh the queue properties
+    RMAdminProtocol adminProtocol = createAdminProtocol();
     RefreshQueuesRequest request = 
       recordFactory.newRecordInstance(RefreshQueuesRequest.class);
     adminProtocol.refreshQueues(request);
-
     return 0;
   }
 
+  private int refreshNodes() throws IOException {
+    // Refresh the nodes
+    RMAdminProtocol adminProtocol = createAdminProtocol();
+    RefreshNodesRequest request = 
+      recordFactory.newRecordInstance(RefreshNodesRequest.class);
+    adminProtocol.refreshNodes(request);
+    return 0;
+  }
+  
+  private int refreshUserToGroupsMappings() throws IOException {
+    // Refresh the user-to-groups mappings
+    RMAdminProtocol adminProtocol = createAdminProtocol();
+    RefreshUserToGroupsMappingsRequest request = 
+      recordFactory.newRecordInstance(RefreshUserToGroupsMappingsRequest.class);
+    adminProtocol.refreshUserToGroupsMappings(request);
+    return 0;
+  }
+  
+  private int refreshSuperUserGroupsConfiguration() throws IOException {
+    // Refresh the super-user groups
+    RMAdminProtocol adminProtocol = createAdminProtocol();
+    RefreshSuperUserGroupsConfigurationRequest request = 
+      recordFactory.newRecordInstance(RefreshSuperUserGroupsConfigurationRequest.class);
+    adminProtocol.refreshSuperUserGroupsConfiguration(request);
+    return 0;
+  }
+  
+  private int refreshAdminAcls() throws IOException {
+    // Refresh the admin acls
+    RMAdminProtocol adminProtocol = createAdminProtocol();
+    RefreshAdminAclsRequest request = 
+      recordFactory.newRecordInstance(RefreshAdminAclsRequest.class);
+    adminProtocol.refreshAdminAcls(request);
+    return 0;
+  }
+  
   @Override
   public int run(String[] args) throws Exception {
     if (args.length < 1) {
@@ -127,7 +207,10 @@ public class RMAdmin extends Configured 
     //
     // verify that we have enough command line parameters
     //
-    if ("-refreshQueues".equals(cmd)) {
+    if ("-refreshAdminAcls".equals(cmd) || "-refreshQueues".equals(cmd) ||
+        "-refreshNodes".equals(cmd) ||
+        "-refreshUserToGroupsMappings".equals(cmd) ||
+        "-refreshSuperUserGroupsConfiguration".equals(cmd)) {
       if (args.length != 1) {
         printUsage(cmd);
         return exitCode;
@@ -138,6 +221,14 @@ public class RMAdmin extends Configured 
     try {
       if ("-refreshQueues".equals(cmd)) {
         exitCode = refreshQueues();
+      } else if ("-refreshNodes".equals(cmd)) {
+        exitCode = refreshNodes();
+      } else if ("-refreshUserToGroupsMappings".equals(cmd)) {
+        exitCode = refreshUserToGroupsMappings();
+      } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
+        exitCode = refreshSuperUserGroupsConfiguration();
+      } else if ("-refreshAdminAcls".equals(cmd)) {
+        exitCode = refreshAdminAcls();
       } else if ("-help".equals(cmd)) {
         if (i < args.length) {
           printUsage(args[i]);

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/proto/RMAdminProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/proto/RMAdminProtocol.proto?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/proto/RMAdminProtocol.proto
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/proto/RMAdminProtocol.proto
Wed Jun  1 14:53:14 2011
@@ -7,4 +7,8 @@ import "yarn_server_resourcemanager_serv
 
 service RMAdminProtocolService {
   rpc refreshQueues(RefreshQueuesRequestProto) returns (RefreshQueuesResponseProto);
+  rpc refreshNodes(RefreshNodesRequestProto) returns (RefreshNodesResponseProto);
+  rpc refreshSuperUserGroupsConfiguration(RefreshSuperUserGroupsConfigurationRequestProto)
returns (RefreshSuperUserGroupsConfigurationResponseProto);
+  rpc refreshUserToGroupsMappings(RefreshUserToGroupsMappingsRequestProto) returns (RefreshUserToGroupsMappingsResponseProto);
+  rpc refreshAdminAcls(RefreshAdminAclsRequestProto) returns (RefreshAdminAclsResponseProto);
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_service_protos.proto?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_service_protos.proto
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/proto/yarn_server_resourcemanager_service_protos.proto
Wed Jun  1 14:53:14 2011
@@ -8,3 +8,23 @@ message RefreshQueuesRequestProto {
 }
 message RefreshQueuesResponseProto {
 }
+
+message RefreshNodesRequestProto {
+}
+message RefreshNodesResponseProto {
+}
+
+message RefreshSuperUserGroupsConfigurationRequestProto {
+}
+message RefreshSuperUserGroupsConfigurationResponseProto {
+}
+
+message RefreshUserToGroupsMappingsRequestProto {
+}
+message RefreshUserToGroupsMappingsResponseProto {
+}
+
+message RefreshAdminAclsRequestProto {
+}
+message RefreshAdminAclsResponseProto {
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
Wed Jun  1 14:53:14 2011
@@ -167,6 +167,22 @@ public class MockNodes {
         // TODO Auto-generated method stub
         return null;
       }
+
+      @Override
+      public int getCommandPort() {
+        return nid;
+      }
+
+      @Override
+      public int getHttpPort() {
+        // TODO Auto-generated method stub
+        return 0;
+      }
+
+      @Override
+      public String getNodeHostName() {
+        return hostName;
+      }
     };
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java?rev=1130172&r1=1130171&r2=1130172&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
Wed Jun  1 14:53:14 2011
@@ -94,8 +94,8 @@ public class TestApplicationCleanup exte
     
     @Override
     protected NodeInfoTracker getAndAddNodeInfoTracker(NodeId nodeId,
-        String hostString, String httpAddress, Node node, Resource capability) {
-      return super.getAndAddNodeInfoTracker(nodeId, hostString, httpAddress, node, capability);
+        String host, int cmPort, int httpPort, Node node, Resource capability) {
+      return super.getAndAddNodeInfoTracker(nodeId, host, cmPort, httpPort, node, capability);
     }
     
     @Override
@@ -243,7 +243,7 @@ public class TestApplicationCleanup exte
     Node node = new NodeBase(hostName, NetworkTopology.DEFAULT_RACK);
     Resource capability = recordFactory.newRecordInstance(Resource.class);
     capability.setMemory(1024);
-    NodeInfoTracker nTracker = clusterTracker.getAndAddNodeInfoTracker(nodeId, hostName,
"localhost:0", node, capability);
+    NodeInfoTracker nTracker = clusterTracker.getAndAddNodeInfoTracker(nodeId, hostName,
i, -1, node, capability);
     return nTracker.getNodeManager();
   }
 



Mime
View raw message