hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1189723 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/s...
Date Thu, 27 Oct 2011 12:44:57 GMT
Author: vinodkv
Date: Thu Oct 27 12:44:57 2011
New Revision: 1189723

URL: http://svn.apache.org/viewvc?rev=1189723&view=rev
Log:
MAPREDUCE-2986. Fixed MiniYARNCluster to support multiple NodeManagers. Contributed by Anupam
Seth.            
svn merge -c r1189721 --ignore-ancestry ../../trunk/

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Oct 27 12:44:57
2011
@@ -383,6 +383,9 @@ Release 0.23.0 - Unreleased
     virtual, allowing for a ratio between the two to be configurable. (todd
     via acmurthy) 
 
+    MAPREDUCE-2986. Fixed MiniYARNCluster to support multiple NodeManagers.
+    (Anupam Seth via vinodkv)
+
   OPTIMIZATIONS
 
     MAPREDUCE-2026. Make JobTracker.getJobCounters() and

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/MiniMRYarnCluster.java
Thu Oct 27 12:44:57 2011
@@ -58,7 +58,11 @@ public class MiniMRYarnCluster extends M
   private JobHistoryServerWrapper historyServerWrapper;
 
   public MiniMRYarnCluster(String testName) {
-    super(testName);
+    this(testName, 1);
+  }
+  
+  public MiniMRYarnCluster(String testName, int noOfNMs) {
+    super(testName, noOfNMs);
     //TODO: add the history server
     historyServerWrapper = new JobHistoryServerWrapper();
     addService(historyServerWrapper);
@@ -80,7 +84,7 @@ public class MiniMRYarnCluster extends M
         Service.class);
 
     // Non-standard shuffle port
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 8083);
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
 
     conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
         DefaultContainerExecutor.class, ContainerExecutor.class);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
Thu Oct 27 12:44:57 2011
@@ -102,7 +102,7 @@ public class TestMRJobs {
     }
 
     if (mrCluster == null) {
-      mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName());
+      mrCluster = new MiniMRYarnCluster(TestMRJobs.class.getName(), 3);
       Configuration conf = new Configuration();
       mrCluster.init(conf);
       mrCluster.start();
@@ -322,7 +322,7 @@ public class TestMRJobs {
     return job;
   }
 
-//@Test
+  //@Test
   public void testSleepJobWithSecurityOn() throws IOException,
       InterruptedException, ClassNotFoundException {
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
Thu Oct 27 12:44:57 2011
@@ -249,9 +249,14 @@ public class ShuffleHandler extends Abst
   public synchronized void start() {
     Configuration conf = getConfig();
     ServerBootstrap bootstrap = new ServerBootstrap(selector);
-    bootstrap.setPipelineFactory(new HttpPipelineFactory(conf));
+    HttpPipelineFactory pipelineFact = new HttpPipelineFactory(conf);
+    bootstrap.setPipelineFactory(pipelineFact);
     port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
-    accepted.add(bootstrap.bind(new InetSocketAddress(port)));
+    Channel ch = bootstrap.bind(new InetSocketAddress(port));
+    accepted.add(ch);
+    port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+    conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
+    pipelineFact.SHUFFLE.setPort(port);
     LOG.info(getName() + " listening on port " + port);
     super.start();
   }
@@ -304,13 +309,17 @@ public class ShuffleHandler extends Abst
     private final IndexCache indexCache;
     private final LocalDirAllocator lDirAlloc =
       new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
-    private final int port;
+    private int port;
 
     public Shuffle(Configuration conf) {
       this.conf = conf;
       indexCache = new IndexCache(new JobConf(conf));
       this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
     }
+    
+    public void setPort(int port) {
+      this.port = port;
+    }
 
     private List<String> splitMaps(List<String> mapq) {
       if (null == mapq) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
Thu Oct 27 12:44:57 2011
@@ -89,8 +89,9 @@ public class DefaultContainerExecutor ex
     String tokenFn = String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId);
     Path tokenDst = new Path(appStorageDir, tokenFn);
     lfs.util().copy(nmPrivateContainerTokensPath, tokenDst);
+    LOG.info("Copying from " + nmPrivateContainerTokensPath + " to " + tokenDst);
     lfs.setWorkingDirectory(appStorageDir);
-
+    LOG.info("CWD set to " + appStorageDir + " = " + lfs.getWorkingDirectory());
     // TODO: DO it over RPC for maintaining similarity?
     localizer.runLocalization(nmAddr);
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
Thu Oct 27 12:44:57 2011
@@ -235,8 +235,15 @@ public class ResourceLocalizationService
     cacheCleanup.scheduleWithFixedDelay(new CacheCleanup(dispatcher),
         cacheCleanupPeriod, cacheCleanupPeriod, TimeUnit.MILLISECONDS);
     server = createServer();
-    LOG.info("Localizer started on port " + server.getPort());
     server.start();
+    String host = getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS)
+        .split(":")[0];
+    getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, host + ":" 
+        + server.getPort());
+    localizationServerAddress = NetUtils.createSocketAddr(
+        getConfig().get(YarnConfiguration.NM_LOCALIZER_ADDRESS, 
+            YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS));
+    LOG.info("Localizer started on port " + server.getPort());
     super.start();
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
Thu Oct 27 12:44:57 2011
@@ -147,7 +147,7 @@ public class TestResourceLocalizationSer
   @Test
   @SuppressWarnings("unchecked") // mocked generics
   public void testResourceRelease() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = new YarnConfiguration();
     AbstractFileSystem spylfs =
       spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
     final FileContext lfs = FileContext.getFileContext(spylfs, conf);
@@ -331,7 +331,7 @@ public class TestResourceLocalizationSer
   @Test
   @SuppressWarnings("unchecked") // mocked generics
   public void testLocalizationHeartbeat() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = new YarnConfiguration();
     AbstractFileSystem spylfs =
       spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
     final FileContext lfs = FileContext.getFileContext(spylfs, conf);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
Thu Oct 27 12:44:57 2011
@@ -292,6 +292,7 @@ public class ApplicationMasterService ex
   public void registerAppAttempt(ApplicationAttemptId attemptId) {
     AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
     response.setResponseId(0);
+    LOG.info("Registering " + attemptId);
     responseMap.put(attemptId, response);
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1189723&r1=1189722&r2=1189723&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Thu Oct 27 12:44:57 2011
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.AbstractService;
 import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service.STATE;
 
 public class MiniYARNCluster extends CompositeService {
 
@@ -60,15 +62,19 @@ public class MiniYARNCluster extends Com
     DefaultMetricsSystem.setMiniClusterMode(true);
   }
 
-  private NodeManager nodeManager;
+  private NodeManager[] nodeManagers;
   private ResourceManager resourceManager;
 
   private ResourceManagerWrapper resourceManagerWrapper;
-  private NodeManagerWrapper nodeManagerWrapper;
   
   private File testWorkDir;
 
   public MiniYARNCluster(String testName) {
+    //default number of nodeManagers = 1
+    this(testName, 1);
+  }
+
+  public MiniYARNCluster(String testName, int noOfNodeManagers) {
     super(testName);
     this.testWorkDir = new File("target", testName);
     try {
@@ -80,8 +86,11 @@ public class MiniYARNCluster extends Com
     } 
     resourceManagerWrapper = new ResourceManagerWrapper();
     addService(resourceManagerWrapper);
-    nodeManagerWrapper = new NodeManagerWrapper();
-    addService(nodeManagerWrapper);
+    nodeManagers = new CustomNodeManager[noOfNodeManagers];
+    for(int index = 0; index < noOfNodeManagers; index++) {
+      addService(new NodeManagerWrapper(index));
+      nodeManagers[index] = new CustomNodeManager();
+    }
   }
 
   public File getTestWorkDir() {
@@ -92,10 +101,10 @@ public class MiniYARNCluster extends Com
     return this.resourceManager;
   }
 
-  public NodeManager getNodeManager() {
-    return this.nodeManager;
+  public NodeManager getNodeManager(int i) {
+    return this.nodeManagers[i];
   }
-
+  
   private class ResourceManagerWrapper extends AbstractService {
     public ResourceManagerWrapper() {
       super(ResourceManagerWrapper.class.getName());
@@ -145,106 +154,60 @@ public class MiniYARNCluster extends Com
   }
 
   private class NodeManagerWrapper extends AbstractService {
-    public NodeManagerWrapper() {
-      super(NodeManagerWrapper.class.getName());
+    int index = 0;
+
+    public NodeManagerWrapper(int i) {
+      super(NodeManagerWrapper.class.getName() + "_" + i);
+      index = i;
     }
 
+    public synchronized void init(Configuration conf) {                          
+      Configuration config = new Configuration(conf);                            
+      super.init(config);                                                        
+    }                                                                            
+
     public synchronized void start() {
       try {
-        File localDir =
-            new File(testWorkDir, MiniYARNCluster.this.getName() + "-localDir");
+        File localDir = new File(testWorkDir, MiniYARNCluster.this.getName()
+            + "-localDir-nm-" + index);
         localDir.mkdir();
         LOG.info("Created localDir in " + localDir.getAbsolutePath());
-        getConfig().set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
+        getConfig().set(YarnConfiguration.NM_LOCAL_DIRS,
+            localDir.getAbsolutePath());
         File logDir =
             new File(testWorkDir, MiniYARNCluster.this.getName()
-                + "-logDir");
+                + "-logDir-nm-" + index);
         File remoteLogDir =
-          new File(testWorkDir, MiniYARNCluster.this.getName()
-              + "-remoteLogDir");
+            new File(testWorkDir, MiniYARNCluster.this.getName()
+                + "-remoteLogDir-nm-" + index);
         logDir.mkdir();
         remoteLogDir.mkdir();
         LOG.info("Created logDir in " + logDir.getAbsolutePath());
-        getConfig().set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
+        getConfig().set(YarnConfiguration.NM_LOG_DIRS,
+            logDir.getAbsolutePath());
         getConfig().set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-            remoteLogDir.getAbsolutePath());
-        getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024); // By default AM + 2 containers
-        nodeManager = new NodeManager() {
-
-          @Override
-          protected void doSecureLogin() throws IOException {
-            // Don't try to login using keytab in the testcase.
-          };
-
-          @Override
-          protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-              Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
-              ContainerTokenSecretManager containerTokenSecretManager) {
-            return new NodeStatusUpdaterImpl(context, dispatcher,
-                healthChecker, metrics, containerTokenSecretManager) {
-              @Override
-              protected ResourceTracker getRMClient() {
-                final ResourceTrackerService rt = resourceManager
-                    .getResourceTrackerService();
-                final RecordFactory recordFactory =
-                  RecordFactoryProvider.getRecordFactory(null);
-
-                // For in-process communication without RPC
-                return new ResourceTracker() {
-
-                  @Override
-                  public NodeHeartbeatResponse nodeHeartbeat(
-                      NodeHeartbeatRequest request) throws YarnRemoteException {
-                    NodeHeartbeatResponse response = recordFactory.newRecordInstance(
-                        NodeHeartbeatResponse.class);
-                    try {
-                      response.setHeartbeatResponse(rt.nodeHeartbeat(request)
-                          .getHeartbeatResponse());
-                    } catch (IOException ioe) {
-                      LOG.info("Exception in heartbeat from node " + 
-                          request.getNodeStatus().getNodeId(), ioe);
-                      throw RPCUtil.getRemoteException(ioe);
-                    }
-                    return response;
-                  }
-
-                  @Override
-                  public RegisterNodeManagerResponse registerNodeManager(
-                      RegisterNodeManagerRequest request)
-                      throws YarnRemoteException {
-                    RegisterNodeManagerResponse response = recordFactory.newRecordInstance(
-                        RegisterNodeManagerResponse.class);
-                    try {
-                      response.setRegistrationResponse(rt
-                          .registerNodeManager(request)
-                          .getRegistrationResponse());
-                    } catch (IOException ioe) {
-                      LOG.info("Exception in node registration from "
-                          + request.getNodeId().toString(), ioe);
-                      throw RPCUtil.getRemoteException(ioe);
-                    }
-                    return response;
-                  }
-                };
-              };
-            };
-          };
-        };
-        nodeManager.init(getConfig());
+            	remoteLogDir.getAbsolutePath());
+        // By default AM + 2 containers
+        getConfig().setInt(YarnConfiguration.NM_PMEM_MB, 4*1024);
+        getConfig().set(YarnConfiguration.NM_ADDRESS, "0.0.0.0:0");
+        getConfig().set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "0.0.0.0:0");
+        getConfig().set(YarnConfiguration.NM_WEBAPP_ADDRESS, "0.0.0.0:0");
+        LOG.info("Starting NM: " + index);
+        nodeManagers[index].init(getConfig());
         new Thread() {
           public void run() {
-            nodeManager.start();
+            nodeManagers[index].start();
           };
         }.start();
         int waitCount = 0;
-        while (nodeManager.getServiceState() == STATE.INITED
+        while (nodeManagers[index].getServiceState() == STATE.INITED
             && waitCount++ < 60) {
-          LOG.info("Waiting for NM to start...");
+          LOG.info("Waiting for NM " + index + " to start...");
           Thread.sleep(1000);
         }
-        if (nodeManager.getServiceState() != STATE.STARTED) {
+        if (nodeManagers[index].getServiceState() != STATE.STARTED) {
           // RM could have failed.
-          throw new IOException("NodeManager failed to start");
+          throw new IOException("NodeManager " + index + " failed to start");
         }
         super.start();
       } catch (Throwable t) {
@@ -254,10 +217,71 @@ public class MiniYARNCluster extends Com
 
     @Override
     public synchronized void stop() {
-      if (nodeManager != null) {
-        nodeManager.stop();
+      if (nodeManagers[index] != null) {
+        nodeManagers[index].stop();
       }
       super.stop();
     }
   }
+  
+  private class CustomNodeManager extends NodeManager {
+    @Override
+    protected void doSecureLogin() throws IOException {
+      // Don't try to login using keytab in the testcase.
+    };
+
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+        ContainerTokenSecretManager containerTokenSecretManager) {
+      return new NodeStatusUpdaterImpl(context, dispatcher,
+          healthChecker, metrics, containerTokenSecretManager) {
+        @Override
+        protected ResourceTracker getRMClient() {
+          final ResourceTrackerService rt = resourceManager
+              .getResourceTrackerService();
+          final RecordFactory recordFactory =
+            RecordFactoryProvider.getRecordFactory(null);
+
+          // For in-process communication without RPC
+          return new ResourceTracker() {
+
+            @Override
+            public NodeHeartbeatResponse nodeHeartbeat(
+                NodeHeartbeatRequest request) throws YarnRemoteException {
+              NodeHeartbeatResponse response = recordFactory.newRecordInstance(
+                  NodeHeartbeatResponse.class);
+              try {
+                response.setHeartbeatResponse(rt.nodeHeartbeat(request)
+                    .getHeartbeatResponse());
+              } catch (IOException ioe) {
+                LOG.info("Exception in heartbeat from node " + 
+                    request.getNodeStatus().getNodeId(), ioe);
+                throw RPCUtil.getRemoteException(ioe);
+              }
+              return response;
+            }
+
+            @Override
+            public RegisterNodeManagerResponse registerNodeManager(
+                RegisterNodeManagerRequest request)
+                throws YarnRemoteException {
+              RegisterNodeManagerResponse response = recordFactory.
+                  newRecordInstance(RegisterNodeManagerResponse.class);
+              try {
+                response.setRegistrationResponse(rt
+                    .registerNodeManager(request)
+                    .getRegistrationResponse());
+              } catch (IOException ioe) {
+                LOG.info("Exception in node registration from "
+                    + request.getNodeId().toString(), ioe);
+                throw RPCUtil.getRemoteException(ioe);
+              }
+              return response;
+            }
+          };
+        };
+      };
+    };
+  }
 }



Mime
View raw message