hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1196458 [14/19] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-cl...
Date Wed, 02 Nov 2011 05:35:03 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Wed Nov  2 05:34:31 2011
@@ -24,10 +24,10 @@ import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -47,24 +47,26 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 
 public class DummyContainerManager extends ContainerManagerImpl {
 
   private static final Log LOG = LogFactory
       .getLog(DummyContainerManager.class);
-
-  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   
   public DummyContainerManager(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
-      NodeManagerMetrics metrics, ContainerTokenSecretManager containerTokenSecretManager) {
-    super(context, exec, deletionContext, nodeStatusUpdater, metrics, containerTokenSecretManager);
+      NodeManagerMetrics metrics,
+      ContainerTokenSecretManager containerTokenSecretManager,
+      ApplicationACLsManager applicationACLsManager) {
+    super(context, exec, deletionContext, nodeStatusUpdater, metrics,
+        containerTokenSecretManager, applicationACLsManager);
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
       DeletionService deletionContext) {
     return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
@@ -120,6 +122,7 @@ public class DummyContainerManager exten
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   protected ContainersLauncher createContainersLauncher(Context context,
       ContainerExecutor exec) {
     return new ContainersLauncher(context, super.dispatcher, exec) {
@@ -144,22 +147,23 @@ public class DummyContainerManager exten
   }
 
   @Override
-  protected LogAggregationService createLogAggregationService(Context context, 
-      DeletionService deletionService) {
-    return new LogAggregationService(context, deletionService) {
+  protected LogHandler createLogHandler(Configuration conf,
+      Context context, DeletionService deletionService) {
+    return new LogHandler() {
+      
       @Override
-      public void handle(LogAggregatorEvent event) {
+      public void handle(LogHandlerEvent event) {
         switch (event.getType()) {
-        case APPLICATION_STARTED:
-          break;
-        case CONTAINER_FINISHED:
-          break;
-        case APPLICATION_FINISHED:
-          break;
-        default:
-          // Ignore
-        }
+          case APPLICATION_STARTED:
+            break;
+          case CONTAINER_FINISHED:
+            break;
+          case APPLICATION_FINISHED:
+            break;
+          default:
+            // Ignore
+          }
       }
     };
   }
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Wed Nov  2 05:34:31 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.event.Asyn
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
@@ -97,9 +98,9 @@ public class TestEventFlow {
       }
     };
 
-    DummyContainerManager containerManager =
-        new DummyContainerManager(context, exec, del, nodeStatusUpdater, 
-            metrics, containerTokenSecretManager);
+    DummyContainerManager containerManager = new DummyContainerManager(
+        context, exec, del, nodeStatusUpdater, metrics,
+        containerTokenSecretManager, new ApplicationACLsManager(conf));
     containerManager.init(conf);
     containerManager.start();
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java Wed Nov  2 05:34:31 2011
@@ -18,151 +18,231 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
-import java.io.BufferedReader;
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
+import java.io.FileOutputStream;
 import java.io.IOException;
-
-import junit.framework.Assert;
+import java.io.PrintWriter;
+import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnsupportedFileSystemException;
-import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+/**
+ * This is intended to test the LinuxContainerExecutor code, but because of
+ * some security restrictions this can only be done with some special setup
+ * first.
+ * <br><ol>
+ * <li>Compile the code with container-executor.conf.dir set to the location you
+ * want for testing.
+ * <br><pre><code>
+ * > mvn clean install -Pnative -Dcontainer-executor.conf.dir=/etc/hadoop
+ *                          -DskipTests
+ * </code></pre>
+ * 
+ * <li>Set up <code>${container-executor.conf.dir}/container-executor.cfg</code>
+ * container-executor.cfg needs to be owned by root and have in it the proper
+ * config values.
+ * <br><pre><code>
+ * > cat /etc/hadoop/container-executor.cfg
+ * yarn.nodemanager.local-dirs=/tmp/hadoop/nm-local/
+ * yarn.nodemanager.log-dirs=/tmp/hadoop/nm-log
+ * yarn.nodemanager.linux-container-executor.group=mapred
+ * #depending on the user id of the application.submitter option
+ * min.user.id=1
+ * > sudo chown root:root /etc/hadoop/container-executor.cfg
+ * > sudo chmod 444 /etc/hadoop/container-executor.cfg
+ * </code></pre>
+ * 
+ * <li>iMove the binary and set proper permissions on it. It needs to be owned 
+ * by root, the group needs to be the group configured in container-executor.cfg, 
+ * and it needs the setuid bit set. (The build will also overwrite it so you
+ * need to move it to a place that you can support it. 
+ * <br><pre><code>
+ * > cp ./hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/container-executor /tmp/
+ * > sudo chown root:mapred /tmp/container-executor
+ * > sudo chmod 4550 /tmp/container-executor
+ * </code></pre>
+ * 
+ * <li>Run the tests with the execution enabled (The user you run the tests as
+ * needs to be part of the group from the config.
+ * <br><pre><code>
+ * mvn test -Dtest=TestLinuxContainerExecutor -Dapplication.submitter=nobody -Dcontainer-executor.path=/tmp/container-executor
+ * </code></pre>
+ * </ol>
+ */
 public class TestLinuxContainerExecutor {
-//
-//  private static final Log LOG = LogFactory
-//      .getLog(TestLinuxContainerExecutor.class);
-//
-//  // TODO: FIXME
-//  private static File workSpace = new File("target",
-//      TestLinuxContainerExecutor.class.getName() + "-workSpace");
-//
-//  @Before
-//  public void setup() throws IOException {
-//    FileContext.getLocalFSFileContext().mkdir(
-//        new Path(workSpace.getAbsolutePath()), null, true);
-//    workSpace.setReadable(true, false);
-//    workSpace.setExecutable(true, false);
-//    workSpace.setWritable(true, false);
-//  }
-//
-//  @After
-//  public void tearDown() throws AccessControlException, FileNotFoundException,
-//      UnsupportedFileSystemException, IOException {
-//    FileContext.getLocalFSFileContext().delete(
-//        new Path(workSpace.getAbsolutePath()), true);
-//  }
-//
+  private static final Log LOG = LogFactory
+      .getLog(TestLinuxContainerExecutor.class);
+  
+  private static File workSpace = new File("target",
+      TestLinuxContainerExecutor.class.getName() + "-workSpace");
+  
+  private LinuxContainerExecutor exec = null;
+  private String appSubmitter = null;
+
+  @Before
+  public void setup() throws Exception {
+    FileContext.getLocalFSFileContext().mkdir(
+        new Path(workSpace.getAbsolutePath()), null, true);
+    workSpace.setReadable(true, false);
+    workSpace.setExecutable(true, false);
+    workSpace.setWritable(true, false);
+    String exec_path = System.getProperty("container-executor.path");
+    if(exec_path != null && !exec_path.isEmpty()) {
+      Configuration conf = new Configuration(false);
+      LOG.info("Setting "+YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
+          +"="+exec_path);
+      conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
+      exec = new LinuxContainerExecutor();
+      exec.setConf(conf);
+    }
+    appSubmitter = System.getProperty("application.submitter");
+    if(appSubmitter == null || appSubmitter.isEmpty()) {
+      appSubmitter = "nobody";
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileContext.getLocalFSFileContext().delete(
+        new Path(workSpace.getAbsolutePath()), true);
+  }
+
+  private boolean shouldRun() {
+    if(exec == null) {
+      LOG.warn("Not running test because container-executor.path is not set");
+      return false;
+    }
+    return true;
+  }
+  
+  private String writeScriptFile(String ... cmd) throws IOException {
+    File f = File.createTempFile("TestLinuxContainerExecutor", ".sh");
+    f.deleteOnExit();
+    PrintWriter p = new PrintWriter(new FileOutputStream(f));
+    p.println("#!/bin/sh");
+    p.print("exec");
+    for(String part: cmd) {
+      p.print(" '");
+      p.print(part.replace("\\", "\\\\").replace("'", "\\'"));
+      p.print("'");
+    }
+    p.println();
+    p.close();
+    return f.getAbsolutePath();
+  }
+  
+  private int id = 0;
+  private synchronized int getNextId() {
+    id += 1;
+    return id;
+  }
+  
+  private ContainerId getNextContainerId() {
+    ContainerId cId = mock(ContainerId.class);
+    String id = "CONTAINER_"+getNextId();
+    when(cId.toString()).thenReturn(id);
+    return cId;
+  }
+  
+
+  private int runAndBlock(String ... cmd) throws IOException {
+    return runAndBlock(getNextContainerId(), cmd);
+  }
+  
+  private int runAndBlock(ContainerId cId, String ... cmd) throws IOException {
+    String appId = "APP_"+getNextId();
+    Container container = mock(Container.class);
+    ContainerLaunchContext context = mock(ContainerLaunchContext.class);
+    HashMap<String, String> env = new HashMap<String,String>();
+
+    when(container.getContainerID()).thenReturn(cId);
+    when(container.getLaunchContext()).thenReturn(context);
+
+    when(context.getEnvironment()).thenReturn(env);
+    
+    String script = writeScriptFile(cmd);
+
+    Path scriptPath = new Path(script);
+    Path tokensPath = new Path("/dev/null");
+    Path workDir = new Path(workSpace.getAbsolutePath());
+    Path pidFile = new Path(workDir, "pid.txt");
+
+    exec.activateContainer(cId, pidFile);
+    return exec.launchContainer(container, scriptPath, tokensPath,
+        appSubmitter, appId, workDir);
+  }
+  
+  
+  @Test
+  public void testContainerLaunch() throws IOException {
+    if (!shouldRun()) {
+      return;
+    }
+
+    File touchFile = new File(workSpace, "touch-file");
+    int ret = runAndBlock("touch", touchFile.getAbsolutePath());
+    
+    assertEquals(0, ret);
+    FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
+          new Path(touchFile.getAbsolutePath()));
+    assertEquals(appSubmitter, fileStatus.getOwner());
+  }
+
   @Test
-  public void testCommandFilePreparation() throws IOException {
-//    LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
-//        "/bin/echo", "hello" }, null, null, "nobody"); // TODO: fix user name
-//    executor.prepareCommandFile(workSpace.getAbsolutePath());
-//
-//    // Now verify the contents of the commandFile
-//    File commandFile = new File(workSpace, LinuxContainerExecutor.COMMAND_FILE);
-//    BufferedReader reader = new BufferedReader(new FileReader(commandFile));
-//    Assert.assertEquals("/bin/echo hello", reader.readLine());
-//    Assert.assertEquals(null, reader.readLine());
-//    Assert.assertTrue(commandFile.canExecute());
-  }
-//
-//  @Test
-//  public void testContainerLaunch() throws IOException {
-//    String containerExecutorPath = System
-//        .getProperty("container-executor-path");
-//    if (containerExecutorPath == null || containerExecutorPath.equals("")) {
-//      LOG.info("Not Running test for lack of container-executor-path");
-//      return;
-//    }
-//
-//    String applicationSubmitter = "nobody";
-//
-//    File touchFile = new File(workSpace, "touch-file");
-//    LinuxContainerExecutor executor = new LinuxContainerExecutor(new String[] {
-//        "touch", touchFile.getAbsolutePath() }, workSpace, null,
-//        applicationSubmitter);
-//    executor.setCommandExecutorPath(containerExecutorPath);
-//    executor.execute();
-//
-//    FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
-//        new Path(touchFile.getAbsolutePath()));
-//    Assert.assertEquals(applicationSubmitter, fileStatus.getOwner());
-//  }
-//
-//  @Test
-//  public void testContainerKill() throws IOException, InterruptedException,
-//      IllegalArgumentException, SecurityException, IllegalAccessException,
-//      NoSuchFieldException {
-//    String containerExecutorPath = System
-//        .getProperty("container-executor-path");
-//    if (containerExecutorPath == null || containerExecutorPath.equals("")) {
-//      LOG.info("Not Running test for lack of container-executor-path");
-//      return;
-//    }
-//
-//    String applicationSubmitter = "nobody";
-//    final LinuxContainerExecutor executor = new LinuxContainerExecutor(
-//        new String[] { "sleep", "100" }, workSpace, null, applicationSubmitter);
-//    executor.setCommandExecutorPath(containerExecutorPath);
-//    new Thread() {
-//      public void run() {
-//        try {
-//          executor.execute();
-//        } catch (IOException e) {
-//          // TODO Auto-generated catch block
-//          e.printStackTrace();
-//        }
-//      };
-//    }.start();
-//
-//    String pid;
-//    while ((pid = executor.getPid()) == null) {
-//      LOG.info("Sleeping for 5 seconds before checking if "
-//          + "the process is alive.");
-//      Thread.sleep(5000);
-//    }
-//    LOG.info("Going to check the liveliness of the process with pid " + pid);
-//
-//    LinuxContainerExecutor checkLiveliness = new LinuxContainerExecutor(
-//        new String[] { "kill", "-0", "-" + pid }, workSpace, null,
-//        applicationSubmitter);
-//    checkLiveliness.setCommandExecutorPath(containerExecutorPath);
-//    checkLiveliness.execute();
-//
-//    LOG.info("Process is alive. "
-//        + "Sleeping for 5 seconds before killing the process.");
-//    Thread.sleep(5000);
-//    LOG.info("Going to killing the process.");
-//
-//    executor.kill();
-//
-//    LOG.info("Sleeping for 5 seconds before checking if "
-//        + "the process is alive.");
-//    Thread.sleep(5000);
-//    LOG.info("Going to check the liveliness of the process.");
-//
-//    // TODO: fix
-//    checkLiveliness = new LinuxContainerExecutor(new String[] { "kill", "-0",
-//        "-" + pid }, workSpace, null, applicationSubmitter);
-//    checkLiveliness.setCommandExecutorPath(containerExecutorPath);
-//    boolean success = false;
-//    try {
-//      checkLiveliness.execute();
-//      success = true;
-//    } catch (IOException e) {
-//      success = false;
-//    }
-//
-//    Assert.assertFalse(success);
-//  }
-}
\ No newline at end of file
+  public void testContainerKill() throws Exception {
+    if (!shouldRun()) {
+      return;
+    }
+    
+    final ContainerId sleepId = getNextContainerId();   
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          runAndBlock(sleepId, "sleep", "100");
+        } catch (IOException e) {
+          LOG.warn("Caught exception while running sleep",e);
+        }
+      };
+    };
+    t.setDaemon(true); //If it does not exit we shouldn't block the test.
+    t.start();
+
+    assertTrue(t.isAlive());
+   
+    String pid = null;
+    int count = 10;
+    while ((pid = exec.getProcessId(sleepId)) == null && count > 0) {
+      LOG.info("Sleeping for 200 ms before checking for pid ");
+      Thread.sleep(200);
+      count--;
+    }
+    assertNotNull(pid);
+
+    LOG.info("Going to killing the process.");
+    exec.signalContainer(appSubmitter, pid, Signal.TERM);
+    LOG.info("sleeping for 100ms to let the sleep be killed");
+    Thread.sleep(100);
+    
+    assertFalse(t.isAlive());
+  }
+}

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Nov  2 05:34:31 2011
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentMa
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -52,12 +53,14 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.service.Service.STATE;
@@ -82,10 +85,16 @@ public class TestNodeStatusUpdater {
   int heartBeatID = 0;
   volatile Error nmStartError = null;
   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
+  private final Configuration conf = new YarnConfiguration();
+  private NodeManager nm;
 
   @After
   public void tearDown() {
     this.registeredNodes.clear();
+    heartBeatID = 0;
+    if (nm != null) {
+      nm.stop();
+    }
     DefaultMetricsSystem.shutdown();
   }
 
@@ -167,7 +176,7 @@ public class TestNodeStatusUpdater {
         launchContext.setContainerId(firstContainerID);
         launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
         launchContext.getResource().setMemory(2);
-        Container container = new ContainerImpl(null, launchContext, null, null);
+        Container container = new ContainerImpl(conf , null, launchContext, null, null);
         this.context.getContainers().put(firstContainerID, container);
       } else if (heartBeatID == 2) {
         // Checks on the RM end
@@ -191,7 +200,7 @@ public class TestNodeStatusUpdater {
         launchContext.setContainerId(secondContainerID);
         launchContext.setResource(recordFactory.newRecordInstance(Resource.class));
         launchContext.getResource().setMemory(3);
-        Container container = new ContainerImpl(null, launchContext, null, null);
+        Container container = new ContainerImpl(conf, null, launchContext, null, null);
         this.context.getContainers().put(secondContainerID, container);
       } else if (heartBeatID == 3) {
         // Checks on the RM end
@@ -217,6 +226,7 @@ public class TestNodeStatusUpdater {
   }
 
   private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
+    public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
     private Context context;
 
     public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
@@ -229,10 +239,44 @@ public class TestNodeStatusUpdater {
 
     @Override
     protected ResourceTracker getRMClient() {
-      return new MyResourceTracker(this.context);
+      return resourceTracker;
     }
   }
+  
+  // 
+  private class MyResourceTracker2 implements ResourceTracker {
+    public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
 
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+      
+      RegisterNodeManagerResponse response = recordFactory
+          .newRecordInstance(RegisterNodeManagerResponse.class);
+      RegistrationResponse regResponse = recordFactory
+      .newRecordInstance(RegistrationResponse.class);
+      regResponse.setNodeAction(registerNodeAction );
+      response.setRegistrationResponse(regResponse);
+      return response;
+    }
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartBeatID++);
+      HeartbeatResponse response = recordFactory
+          .newRecordInstance(HeartbeatResponse.class);
+      response.setResponseId(heartBeatID);
+      response.setNodeAction(heartBeatNodeAction);
+      
+      NodeHeartbeatResponse nhResponse = recordFactory
+      .newRecordInstance(NodeHeartbeatResponse.class);
+      nhResponse.setHeartbeatResponse(response);
+      return nhResponse;
+    }
+  }
+  
   @Before
   public void clearError() {
     nmStartError = null;
@@ -246,7 +290,7 @@ public class TestNodeStatusUpdater {
 
   @Test
   public void testNMRegistration() throws InterruptedException {
-    final NodeManager nm = new NodeManager() {
+    nm = new NodeManager() {
       @Override
       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
           Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@@ -292,14 +336,85 @@ public class TestNodeStatusUpdater {
       Assert.fail("NodeManager failed to start");
     }
 
-    while (heartBeatID <= 3) {
+    waitCount = 0;
+    while (heartBeatID <= 3 && waitCount++ != 20) {
       Thread.sleep(500);
     }
+    Assert.assertFalse(heartBeatID <= 3);
     Assert.assertEquals("Number of registered NMs is wrong!!", 1,
         this.registeredNodes.size());
 
     nm.stop();
   }
+  
+  @Test
+  public void testNodeDecommision() throws Exception {
+    nm = getNodeManager(NodeAction.SHUTDOWN);
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    Assert.assertEquals(STATE.INITED, nm.getServiceState());
+    nm.start();
+
+    int waitCount = 0;
+    while (heartBeatID < 1 && waitCount++ != 20) {
+      Thread.sleep(500);
+    }
+    Assert.assertFalse(heartBeatID < 1);
+
+    // NM takes a while to reach the STOPPED state.
+    waitCount = 0;
+    while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+      LOG.info("Waiting for NM to stop..");
+      Thread.sleep(1000);
+    }
+
+    Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+  }
+
+  @Test
+  public void testNodeReboot() throws Exception {
+    nm = getNodeManager(NodeAction.REBOOT);
+    YarnConfiguration conf = createNMConfig();
+    nm.init(conf);
+    Assert.assertEquals(STATE.INITED, nm.getServiceState());
+    nm.start();
+
+    int waitCount = 0;
+    while (heartBeatID < 1 && waitCount++ != 20) {
+      Thread.sleep(500);
+    }
+    Assert.assertFalse(heartBeatID < 1);
+
+    // NM takes a while to reach the STOPPED state.
+    waitCount = 0;
+    while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+      LOG.info("Waiting for NM to stop..");
+      Thread.sleep(1000);
+    }
+
+    Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+  }
+  
+  @Test
+  public void testNMShutdownForRegistrationFailure() {
+
+    nm = new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+          ContainerTokenSecretManager containerTokenSecretManager) {
+        MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
+            context, dispatcher, healthChecker, metrics,
+            containerTokenSecretManager);
+        MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+        myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
+        nodeStatusUpdater.resourceTracker = myResourceTracker2;
+        return nodeStatusUpdater;
+      }
+    };
+    verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
+        + "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
+  }
 
   /**
    * Verifies that if for some reason NM fails to start ContainerManager RPC
@@ -311,7 +426,7 @@ public class TestNodeStatusUpdater {
   @Test
   public void testNoRegistrationWhenNMServicesFail() {
 
-    final NodeManager nm = new NodeManager() {
+    nm = new NodeManager() {
       @Override
       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
           Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
@@ -324,9 +439,11 @@ public class TestNodeStatusUpdater {
       protected ContainerManagerImpl createContainerManager(Context context,
           ContainerExecutor exec, DeletionService del,
           NodeStatusUpdater nodeStatusUpdater,
-          ContainerTokenSecretManager containerTokenSecretManager) {
-        return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
-            metrics, containerTokenSecretManager) {
+          ContainerTokenSecretManager containerTokenSecretManager,
+          ApplicationACLsManager aclsManager) {
+        return new ContainerManagerImpl(context, exec, del,
+            nodeStatusUpdater, metrics, containerTokenSecretManager,
+            aclsManager) {
           @Override
           public void start() {
             // Simulating failure of starting RPC server
@@ -336,16 +453,22 @@ public class TestNodeStatusUpdater {
       }
     };
 
+    verifyNodeStartFailure("Starting of RPC Server failed");
+  }
+
+  private void verifyNodeStartFailure(String errMessage) {
     YarnConfiguration conf = createNMConfig();
     nm.init(conf);
     try {
       nm.start();
       Assert.fail("NM should have failed to start. Didn't get exception!!");
     } catch (Exception e) {
-      Assert.assertEquals("Starting of RPC Server failed", e.getCause()
+      Assert.assertEquals(errMessage, e.getCause()
           .getMessage());
     }
-
+    
+    // the state change to stopped occurs only if the startup is success, else
+    // state change doesn't occur
     Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
         .getServiceState());
 
@@ -355,7 +478,7 @@ public class TestNodeStatusUpdater {
 
   private YarnConfiguration createNMConfig() {
     YarnConfiguration conf = new YarnConfiguration();
-    conf.setInt(YarnConfiguration.NM_VMEM_GB, 5); // 5GB
+    conf.setInt(YarnConfiguration.NM_PMEM_MB, 5*1024); // 5GB
     conf.set(YarnConfiguration.NM_ADDRESS, "127.0.0.1:12345");
     conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, "127.0.0.1:12346");
     conf.set(YarnConfiguration.NM_LOG_DIRS, new Path(basedir, "logs").toUri()
@@ -366,4 +489,21 @@ public class TestNodeStatusUpdater {
         .toUri().getPath());
     return conf;
   }
+  
+  private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
+    return new NodeManager() {
+      @Override
+      protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+          ContainerTokenSecretManager containerTokenSecretManager) {
+        MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
+            context, dispatcher, healthChecker, metrics,
+            containerTokenSecretManager);
+        MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
+        myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction;
+        myNodeStatusUpdater.resourceTracker = myResourceTracker2;
+        return myNodeStatusUpdater;
+      }
+    };
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBLocalizerRPC.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBLocalizerRPC.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBLocalizerRPC.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/api/protocolrecords/impl/pb/TestPBLocalizerRPC.java Wed Nov  2 05:34:31 2011
@@ -20,7 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import java.net.InetSocketAddress;
 
-import org.apache.avro.ipc.Server;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -61,7 +61,7 @@ public class TestPBLocalizerRPC {
 
     public void stop() {
       if (server != null) {
-        server.close();
+        server.stop();
       }
     }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Wed Nov  2 05:34:31 2011
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.Service.STATE;
 import org.junit.After;
@@ -146,9 +147,9 @@ public abstract class BaseContainerManag
     delSrvc.init(conf);
 
     exec = createContainerExecutor();
-    containerManager =
-        new ContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater,
-                                 metrics, this.containerTokenSecretManager);
+    containerManager = new ContainerManagerImpl(context, exec, delSrvc,
+        nodeStatusUpdater, metrics, this.containerTokenSecretManager,
+        new ApplicationACLsManager(conf));
     containerManager.init(conf);
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java Wed Nov  2 05:34:31 2011
@@ -22,8 +22,12 @@ import org.junit.Test;
 import static org.junit.Assert.*;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -39,6 +43,7 @@ import org.apache.hadoop.yarn.service.Se
 import static org.apache.hadoop.yarn.service.Service.STATE.*;
 
 public class TestAuxServices {
+  private static final Log LOG = LogFactory.getLog(TestAuxServices.class);
 
   static class LightService extends AbstractService
       implements AuxServices.AuxiliaryService {
@@ -47,6 +52,7 @@ public class TestAuxServices {
     private int remaining_init;
     private int remaining_stop;
     private ByteBuffer meta = null;
+    private ArrayList<Integer> stoppedApps;
 
     LightService(String name, char idef, int expected_appId) {
       this(name, idef, expected_appId, null);
@@ -56,7 +62,13 @@ public class TestAuxServices {
       this.idef = idef;
       this.expected_appId = expected_appId;
       this.meta = meta;
+      this.stoppedApps = new ArrayList<Integer>();
     }
+
+    public ArrayList<Integer> getAppIdsStopped() {
+      return (ArrayList)this.stoppedApps.clone();
+    }
+
     @Override
     public void init(Configuration conf) {
       remaining_init = conf.getInt(idef + ".expected.init", 0);
@@ -77,7 +89,7 @@ public class TestAuxServices {
     }
     @Override
     public void stopApp(ApplicationId appId) {
-      assertEquals(expected_appId, appId.getId());
+      stoppedApps.add(appId.getId());
     }
     @Override
     public ByteBuffer getMeta() {
@@ -86,11 +98,15 @@ public class TestAuxServices {
   }
 
   static class ServiceA extends LightService {
-    public ServiceA() { super("A", 'A', 65, ByteBuffer.wrap("A".getBytes())); }
+    public ServiceA() { 
+      super("A", 'A', 65, ByteBuffer.wrap("A".getBytes()));
+    }
   }
 
   static class ServiceB extends LightService {
-    public ServiceB() { super("B", 'B', 66, ByteBuffer.wrap("B".getBytes())); }
+    public ServiceB() { 
+      super("B", 'B', 66, ByteBuffer.wrap("B".getBytes()));
+    }
   }
 
   @Test
@@ -119,6 +135,14 @@ public class TestAuxServices {
     appId.setId(66);
     event = new AuxServicesEvent(
         AuxServicesEventType.APPLICATION_STOP, "user0", appId, "Bsrv", null);
+    // verify all services got the stop event 
+    aux.handle(event);
+    Collection<AuxServices.AuxiliaryService> servs = aux.getServices();
+    for (AuxServices.AuxiliaryService serv: servs) {
+      ArrayList<Integer> appIds = ((LightService)serv).getAppIdsStopped();
+      assertEquals("app not properly stopped", 1, appIds.size());
+      assertTrue("wrong app stopped", appIds.contains((Integer)66));
+    }
   }
 
   @Test

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Wed Nov  2 05:34:31 2011
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
@@ -279,7 +280,7 @@ public class TestContainerManager extend
     gcsRequest.setContainerId(cId);
     ContainerStatus containerStatus = 
         containerManager.getContainerStatus(gcsRequest).getStatus();
-    Assert.assertEquals(ExitCode.KILLED.getExitCode(),
+    Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
         containerStatus.getExitStatus());
 
     // Assert that the process is not alive anymore
@@ -385,7 +386,8 @@ public class TestContainerManager extend
     ContainerTokenSecretManager containerTokenSecretManager = new 
         ContainerTokenSecretManager();
     containerManager = new ContainerManagerImpl(context, exec, delSrvc,
-        nodeStatusUpdater, metrics, containerTokenSecretManager);
+        nodeStatusUpdater, metrics, containerTokenSecretManager,
+        new ApplicationACLsManager(conf));
     containerManager.init(conf);
     containerManager.start();
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java Wed Nov  2 05:34:31 2011
@@ -33,10 +33,12 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -45,6 +47,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -68,6 +71,7 @@ import org.mockito.ArgumentMatcher;
 public class TestContainer {
 
   final NodeManagerMetrics metrics = NodeManagerMetrics.create();
+  final Configuration conf = new YarnConfiguration();
 
   
   /**
@@ -165,7 +169,7 @@ public class TestContainer {
       wc.localizeResources();
       wc.launchContainer();
       reset(wc.localizerBus);
-      wc.containerFailed(ExitCode.KILLED.getExitCode());
+      wc.containerFailed(ExitCode.FORCE_KILLED.getExitCode());
       assertEquals(ContainerState.EXITED_WITH_FAILURE, 
           wc.c.getContainerState());
       verifyCleanupCall(wc);
@@ -222,6 +226,89 @@ public class TestContainer {
     }
   }
   
+  @Test
+  public void testKillOnLocalizationFailed() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(15, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      wc.failLocalizeResources(wc.getLocalResourceCount());
+      assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      wc.killContainer();
+      assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+  
+  @Test
+  public void testResourceLocalizedOnLocalizationFailed() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(16, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      int failCount = wc.getLocalResourceCount()/2;
+      if (failCount == 0) {
+        failCount = 1;
+      }
+      wc.failLocalizeResources(failCount);
+      assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      wc.localizeResourcesFromInvalidState(failCount);
+      assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+  
+  @Test
+  public void testResourceFailedOnLocalizationFailed() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(16, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      
+      Iterator<String> lRsrcKeys = wc.localResources.keySet().iterator();
+      String key1 = lRsrcKeys.next();
+      String key2 = lRsrcKeys.next();
+      wc.failLocalizeSpecificResource(key1);
+      assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      wc.failLocalizeSpecificResource(key2);
+      assertEquals(ContainerState.LOCALIZATION_FAILED, wc.c.getContainerState());
+      verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+  
+  @Test
+  public void testResourceFailedOnKilling() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(16, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      
+      Iterator<String> lRsrcKeys = wc.localResources.keySet().iterator();
+      String key1 = lRsrcKeys.next();
+      wc.killContainer();
+      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      wc.failLocalizeSpecificResource(key1);
+      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+  
   /**
    * Verify serviceData correctly sent.
    */
@@ -265,6 +352,26 @@ public class TestContainer {
     }
   }
 
+  @Test
+  public void testLaunchAfterKillRequest() throws Exception {
+    WrappedContainer wc = null;
+    try {
+      wc = new WrappedContainer(14, 314159265358979L, 4344, "yak");
+      wc.initContainer();
+      wc.localizeResources();
+      wc.killContainer();
+      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      wc.launchContainer();
+      assertEquals(ContainerState.KILLING, wc.c.getContainerState());
+      wc.containerKilledOnRequest();
+      verifyCleanupCall(wc);
+    } finally {
+      if (wc != null) {
+        wc.finished();
+      }
+    }
+  }
+  
   private void verifyCleanupCall(WrappedContainer wc) throws Exception {
     ResourcesReleasedMatcher matchesReq =
         new ResourcesReleasedMatcher(wc.localResources, EnumSet.of(
@@ -384,7 +491,7 @@ public class TestContainer {
   }
 
   private Container newContainer(Dispatcher disp, ContainerLaunchContext ctx) {
-    return new ContainerImpl(disp, ctx, null, metrics);
+    return new ContainerImpl(conf, disp, ctx, null, metrics);
   }
   
   @SuppressWarnings("unchecked")
@@ -468,11 +575,20 @@ public class TestContainer {
       drainDispatcherEvents();
     }
 
-    public Map<Path, String> localizeResources() throws URISyntaxException {
+    // Localize resources 
+    // Skip some resources so as to consider them failed
+    public Map<Path, String> doLocalizeResources(boolean checkLocalizingState,
+        int skipRsrcCount) throws URISyntaxException {
       Path cache = new Path("file:///cache");
       Map<Path, String> localPaths = new HashMap<Path, String>();
+      int counter = 0;
       for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
-        assertEquals(ContainerState.LOCALIZING, c.getContainerState());
+        if (counter++ < skipRsrcCount) {
+          continue;
+        }
+        if (checkLocalizingState) {
+          assertEquals(ContainerState.LOCALIZING, c.getContainerState());
+        }
         LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
         Path p = new Path(cache, rsrc.getKey());
         localPaths.put(p, rsrc.getKey());
@@ -483,6 +599,42 @@ public class TestContainer {
       drainDispatcherEvents();
       return localPaths;
     }
+    
+    
+    public Map<Path, String> localizeResources() throws URISyntaxException {
+      return doLocalizeResources(true, 0);
+    }
+    
+    public void localizeResourcesFromInvalidState(int skipRsrcCount)
+        throws URISyntaxException {
+      doLocalizeResources(false, skipRsrcCount);
+    }
+    
+    public void failLocalizeSpecificResource(String rsrcKey)
+        throws URISyntaxException {
+      LocalResource rsrc = localResources.get(rsrcKey);
+      LocalResourceRequest req = new LocalResourceRequest(rsrc);
+      Exception e = new Exception("Fake localization error");
+      c.handle(new ContainerResourceFailedEvent(c.getContainerID(), req, e));
+      drainDispatcherEvents();
+    }
+
+    // fail to localize some resources
+    public void failLocalizeResources(int failRsrcCount)
+        throws URISyntaxException {
+      int counter = 0;
+      for (Entry<String, LocalResource> rsrc : localResources.entrySet()) {
+        if (counter >= failRsrcCount) {
+          break;
+        }
+        ++counter;
+        LocalResourceRequest req = new LocalResourceRequest(rsrc.getValue());
+        Exception e = new Exception("Fake localization error");
+        c.handle(new ContainerResourceFailedEvent(c.getContainerID(), 
+                 req, e));
+      }
+      drainDispatcherEvents();     
+    }
 
     public void launchContainer() {
       c.handle(new ContainerEvent(cId, ContainerEventType.CONTAINER_LAUNCHED));
@@ -508,9 +660,13 @@ public class TestContainer {
 
     public void containerKilledOnRequest() {
       c.handle(new ContainerExitEvent(cId,
-          ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.KILLED
+          ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ExitCode.FORCE_KILLED
               .getExitCode()));
       drainDispatcherEvents();
     }
+    
+    public int getLocalResourceCount() {
+      return localResources.size();
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java Wed Nov  2 05:34:31 2011
@@ -33,7 +33,7 @@ import java.util.Set;
 
 import junit.framework.Assert;
 
-import org.apache.avro.ipc.Server;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.util.Conve
 import org.junit.Test;
 import static org.junit.Assert.*;
 
+import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
 import static org.mockito.Mockito.*;
 
@@ -146,7 +147,7 @@ public class TestResourceLocalizationSer
   @Test
   @SuppressWarnings("unchecked") // mocked generics
   public void testResourceRelease() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = new YarnConfiguration();
     AbstractFileSystem spylfs =
       spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
     final FileContext lfs = FileContext.getFileContext(spylfs, conf);
@@ -330,7 +331,7 @@ public class TestResourceLocalizationSer
   @Test
   @SuppressWarnings("unchecked") // mocked generics
   public void testLocalizationHeartbeat() throws Exception {
-    Configuration conf = new Configuration();
+    Configuration conf = new YarnConfiguration();
     AbstractFileSystem spylfs =
       spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
     final FileContext lfs = FileContext.getFileContext(spylfs, conf);
@@ -355,7 +356,8 @@ public class TestResourceLocalizationSer
     dispatcher.register(ContainerEventType.class, containerBus);
 
     ContainerExecutor exec = mock(ContainerExecutor.class);
-    DeletionService delService = new DeletionService(exec);
+    DeletionService delServiceReal = new DeletionService(exec);
+    DeletionService delService = spy(delServiceReal);
     delService.init(null);
     delService.start();
 
@@ -407,12 +409,14 @@ public class TestResourceLocalizationSer
       rsrcs.put(LocalResourceVisibility.PRIVATE, Collections.singletonList(req));
       spyService.handle(new ContainerLocalizationRequestEvent(c, rsrcs));
       // Sigh. Thread init of private localizer not accessible
-      Thread.sleep(500);
+      Thread.sleep(1000);
       dispatcher.await();
       String appStr = ConverterUtils.toString(appId);
       String ctnrStr = c.getContainerID().toString();
-      verify(exec).startLocalizer(isA(Path.class), isA(InetSocketAddress.class),
-            eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+      ArgumentCaptor<Path> tokenPathCaptor = ArgumentCaptor.forClass(Path.class);
+      verify(exec).startLocalizer(tokenPathCaptor.capture(), isA(InetSocketAddress.class),
+        eq("user0"), eq(appStr), eq(ctnrStr), isA(List.class));
+      Path localizationTokenPath = tokenPathCaptor.getValue();
 
       // heartbeat from localizer
       LocalResourceStatus rsrcStat = mock(LocalResourceStatus.class);
@@ -454,10 +458,13 @@ public class TestResourceLocalizationSer
         };
       dispatcher.await();
       verify(containerBus).handle(argThat(matchesContainerLoc));
+      
+      // Verify deletion of localization token.
+      verify(delService).delete((String)isNull(), eq(localizationTokenPath));
     } finally {
-      delService.stop();
-      dispatcher.stop();
       spyService.stop();
+      dispatcher.stop();
+      delService.stop();
     }
   }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceRetention.java Wed Nov  2 05:34:31 2011
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMa
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 
 import org.junit.Test;
@@ -82,7 +83,7 @@ public class TestResourceRetention {
     for (int i = 0; i < nRsrcs; ++i) {
       final LocalResourceRequest req = new LocalResourceRequest(
           new Path("file:///" + user + "/rsrc" + i), timestamp + i * tsstep,
-          LocalResourceType.FILE);
+          LocalResourceType.FILE, LocalResourceVisibility.PUBLIC);
       final long ts = timestamp + i * tsstep;
       final Path p = new Path("file:///local/" + user + "/rsrc" + i);
       LocalizedResource rsrc = new LocalizedResource(req, null) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Wed Nov  2 05:34:31 2011
@@ -18,6 +18,12 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.File;
@@ -28,8 +34,10 @@ import java.io.Writer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import junit.framework.Assert;
 
@@ -41,6 +49,7 @@ import org.apache.hadoop.io.DataOutputBu
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -50,24 +59,31 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogReader;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
 
-@Ignore
+//@Ignore
 public class TestLogAggregationService extends BaseContainerManagerTest {
 
+  private Map<ApplicationAccessType, String> acls = createAppAcls();
+  
   static {
     LOG = LogFactory.getLog(TestLogAggregationService.class);
   }
@@ -91,17 +107,24 @@ public class TestLogAggregationService e
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testLocalFileDeletionAfterUpload() throws IOException {
     this.delSrvc = new DeletionService(createContainerExecutor());
     this.delSrvc.init(conf);
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
+    
+    DrainDispatcher dispatcher = createDispatcher();
+    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+    
     LogAggregationService logAggregationService =
-        new LogAggregationService(this.context, this.delSrvc);
+        new LogAggregationService(dispatcher, this.context, this.delSrvc);
     logAggregationService.init(this.conf);
     logAggregationService.start();
 
+    
     ApplicationId application1 = BuilderUtils.newApplicationId(1234, 1);
 
     // AppLogDir should be created
@@ -109,25 +132,24 @@ public class TestLogAggregationService e
         new File(localLogDir, ConverterUtils.toString(application1));
     app1LogDir.mkdir();
     logAggregationService
-        .handle(new LogAggregatorAppStartedEvent(
+        .handle(new LogHandlerAppStartedEvent(
             application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
 
-    ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
-    appAttemptId.setApplicationId(application1);
-    appAttemptId.setAttemptId(1);
-    ContainerId container11 =
-        BuilderUtils.newContainerId(recordFactory, application1, appAttemptId, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(application1, 1);
+    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId, 1);
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11);
     logAggregationService.handle(
-        new LogAggregatorContainerFinishedEvent(container11, 0));
+        new LogHandlerContainerFinishedEvent(container11, 0));
 
-    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(
         application1));
 
     logAggregationService.stop();
 
+    
     String containerIdStr = ConverterUtils.toString(container11);
     File containerLogDir = new File(app1LogDir, containerIdStr);
     for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
@@ -136,17 +158,36 @@ public class TestLogAggregationService e
 
     Assert.assertFalse(app1LogDir.exists());
 
-    Assert.assertTrue(new File(logAggregationService
-        .getRemoteNodeLogFileForApp(application1).toUri().getPath()).exists());
+    Path logFilePath =
+        logAggregationService.getRemoteNodeLogFileForApp(application1,
+            this.user);
+    Assert.assertTrue("Log file [" + logFilePath + "] not found", new File(
+        logFilePath.toUri().getPath()).exists());
+    
+    dispatcher.await();
+    ArgumentCaptor<ApplicationEvent> eventCaptor =
+        ArgumentCaptor.forClass(ApplicationEvent.class);
+    verify(appEventHandler).handle(eventCaptor.capture());
+    assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+        eventCaptor.getValue().getType());
+    assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
+        .getApplicationID());
+    
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testNoContainerOnNode() {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
+    
+    DrainDispatcher dispatcher = createDispatcher();
+    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+    
     LogAggregationService logAggregationService =
-        new LogAggregationService(this.context, this.delSrvc);
+        new LogAggregationService(dispatcher, this.context, this.delSrvc);
     logAggregationService.init(this.conf);
     logAggregationService.start();
 
@@ -157,29 +198,44 @@ public class TestLogAggregationService e
       new File(localLogDir, ConverterUtils.toString(application1));
     app1LogDir.mkdir();
     logAggregationService
-        .handle(new LogAggregatorAppStartedEvent(
+        .handle(new LogHandlerAppStartedEvent(
             application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
 
-    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(
         application1));
 
     logAggregationService.stop();
 
-    Assert
-        .assertFalse(new File(logAggregationService
-            .getRemoteNodeLogFileForApp(application1).toUri().getPath())
-            .exists());
+    Assert.assertFalse(new File(logAggregationService
+        .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
+        .exists());
+    
+    dispatcher.await();
+    ArgumentCaptor<ApplicationEvent> eventCaptor =
+        ArgumentCaptor.forClass(ApplicationEvent.class);
+    verify(appEventHandler).handle(eventCaptor.capture());
+    assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+        eventCaptor.getValue().getType());
+    verify(appEventHandler).handle(eventCaptor.capture());
+    assertEquals(application1, eventCaptor.getValue()
+        .getApplicationID());
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testMultipleAppsLogAggregation() throws IOException {
 
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
+    
+    DrainDispatcher dispatcher = createDispatcher();
+    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+    
     LogAggregationService logAggregationService =
-        new LogAggregationService(this.context, this.delSrvc);
+        new LogAggregationService(dispatcher, this.context, this.delSrvc);
     logAggregationService.init(this.conf);
     logAggregationService.start();
 
@@ -190,92 +246,80 @@ public class TestLogAggregationService e
       new File(localLogDir, ConverterUtils.toString(application1));
     app1LogDir.mkdir();
     logAggregationService
-        .handle(new LogAggregatorAppStartedEvent(
+        .handle(new LogHandlerAppStartedEvent(
             application1, this.user, null,
-            ContainerLogsRetentionPolicy.ALL_CONTAINERS));
+            ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
 
-    ApplicationAttemptId appAttemptId1 = 
-        recordFactory.newRecordInstance(ApplicationAttemptId.class);
-    appAttemptId1.setApplicationId(application1);
-    ContainerId container11 =
-        BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1, 1);
+    ApplicationAttemptId appAttemptId1 =
+        BuilderUtils.newApplicationAttemptId(application1, 1);
+    ContainerId container11 = BuilderUtils.newContainerId(appAttemptId1, 1);
+    
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11);
     logAggregationService.handle(
-        new LogAggregatorContainerFinishedEvent(container11, 0));
+        new LogHandlerContainerFinishedEvent(container11, 0));
 
     ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
-    ApplicationAttemptId appAttemptId2 = 
-        recordFactory.newRecordInstance(ApplicationAttemptId.class);
-    appAttemptId1.setApplicationId(application2);
+    ApplicationAttemptId appAttemptId2 =
+        BuilderUtils.newApplicationAttemptId(application2, 1);
 
     File app2LogDir =
       new File(localLogDir, ConverterUtils.toString(application2));
     app2LogDir.mkdir();
-    logAggregationService.handle(new LogAggregatorAppStartedEvent(
+    logAggregationService.handle(new LogHandlerAppStartedEvent(
         application2, this.user, null,
-        ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY));
+        ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
 
     
-    ContainerId container21 =
-        BuilderUtils.newContainerId(recordFactory, application2, 
-            appAttemptId2, 1);
+    ContainerId container21 = BuilderUtils.newContainerId(appAttemptId2, 1);
+    
     writeContainerLogs(app2LogDir, container21);
     logAggregationService.handle(
-        new LogAggregatorContainerFinishedEvent(container21, 0));
+        new LogHandlerContainerFinishedEvent(container21, 0));
+
+    ContainerId container12 = BuilderUtils.newContainerId(appAttemptId1, 2);
 
-    ContainerId container12 =
-        BuilderUtils.newContainerId(recordFactory, application1, appAttemptId1, 
-            2);
     writeContainerLogs(app1LogDir, container12);
     logAggregationService.handle(
-        new LogAggregatorContainerFinishedEvent(container12, 0));
+        new LogHandlerContainerFinishedEvent(container12, 0));
 
     ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
-    ApplicationAttemptId appAttemptId3 = 
-        recordFactory.newRecordInstance(ApplicationAttemptId.class);
-    appAttemptId1.setApplicationId(application3);
+    ApplicationAttemptId appAttemptId3 =
+        BuilderUtils.newApplicationAttemptId(application3, 1);
 
     File app3LogDir =
       new File(localLogDir, ConverterUtils.toString(application3));
     app3LogDir.mkdir();
-    logAggregationService.handle(new LogAggregatorAppStartedEvent(
-        application3, this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY));
-
-    ContainerId container31 =
-        BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3, 
-            1);
+    logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
+        this.user, null,
+        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
+        
+
+    ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
     writeContainerLogs(app3LogDir, container31);
     logAggregationService.handle(
-        new LogAggregatorContainerFinishedEvent(container31, 0));
+        new LogHandlerContainerFinishedEvent(container31, 0));
 
-    ContainerId container32 =
-        BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3,
-            2);
+    ContainerId container32 = BuilderUtils.newContainerId(appAttemptId3, 2);
     writeContainerLogs(app3LogDir, container32);
     logAggregationService.handle(
-        new LogAggregatorContainerFinishedEvent(container32, 1)); // Failed 
+        new LogHandlerContainerFinishedEvent(container32, 1)); // Failed 
 
-    ContainerId container22 =
-        BuilderUtils.newContainerId(recordFactory, application2, appAttemptId2, 
-            2);
+    ContainerId container22 = BuilderUtils.newContainerId(appAttemptId2, 2);
     writeContainerLogs(app2LogDir, container22);
     logAggregationService.handle(
-        new LogAggregatorContainerFinishedEvent(container22, 0));
+        new LogHandlerContainerFinishedEvent(container22, 0));
 
-    ContainerId container33 =
-        BuilderUtils.newContainerId(recordFactory, application3, appAttemptId3, 
-            3);
+    ContainerId container33 = BuilderUtils.newContainerId(appAttemptId3, 3);
     writeContainerLogs(app3LogDir, container33);
     logAggregationService.handle(
-        new LogAggregatorContainerFinishedEvent(container33, 0));
+        new LogHandlerContainerFinishedEvent(container33, 0));
 
-    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(
         application2));
-    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(
         application3));
-    logAggregationService.handle(new LogAggregatorAppFinishedEvent(
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(
         application1));
 
     logAggregationService.stop();
@@ -286,6 +330,22 @@ public class TestLogAggregationService e
         new ContainerId[] { container21 });
     verifyContainerLogs(logAggregationService, application3,
         new ContainerId[] { container31, container32 });
+    
+    dispatcher.await();
+    ArgumentCaptor<ApplicationEvent> eventCaptor =
+        ArgumentCaptor.forClass(ApplicationEvent.class);
+
+    verify(appEventHandler, times(3)).handle(eventCaptor.capture());
+    List<ApplicationEvent> capturedEvents = eventCaptor.getAllValues();
+    Set<ApplicationId> appIds = new HashSet<ApplicationId>();
+    for (ApplicationEvent cap : capturedEvents) {
+      assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+          eventCaptor.getValue().getType());
+      appIds.add(cap.getApplicationID());
+    }
+    assertTrue(appIds.contains(application1));
+    assertTrue(appIds.contains(application2));
+    assertTrue(appIds.contains(application3));
   }
 
   private void writeContainerLogs(File appLogDir, ContainerId containerId)
@@ -306,7 +366,11 @@ public class TestLogAggregationService e
       ContainerId[] expectedContainerIds) throws IOException {
     AggregatedLogFormat.LogReader reader =
         new AggregatedLogFormat.LogReader(this.conf,
-            logAggregationService.getRemoteNodeLogFileForApp(appId));
+            logAggregationService.getRemoteNodeLogFileForApp(appId, this.user));
+    
+    Assert.assertEquals(this.user, reader.getApplicationOwner());
+    verifyAcls(reader.getApplicationAcls());
+    
     try {
       Map<String, Map<String, String>> logMap =
           new HashMap<String, Map<String, String>>();
@@ -382,6 +446,7 @@ public class TestLogAggregationService e
 
     this.containerManager.start();
 
+
     File scriptFile = new File(tmpDir, "scriptFile.sh");
     PrintWriter fileWriter = new PrintWriter(scriptFile);
     fileWriter.write("\necho Hello World! Stdout! > "
@@ -400,13 +465,10 @@ public class TestLogAggregationService e
         recordFactory.newRecordInstance(ApplicationId.class);
     appId.setClusterTimestamp(0);
     appId.setId(0);
-    ApplicationAttemptId appAttemptId = 
-        recordFactory.newRecordInstance(ApplicationAttemptId.class);
-    appAttemptId.setApplicationId(appId);
-    appAttemptId.setAttemptId(1);
-    ContainerId cId = recordFactory.newRecordInstance(ContainerId.class);
-    cId.setId(0);
-    cId.setApplicationAttemptId(appAttemptId);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+    ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 0);
+
     containerLaunchContext.setContainerId(cId);
 
     containerLaunchContext.setUser(this.user);
@@ -446,4 +508,27 @@ public class TestLogAggregationService e
         .asList(appId)));
     this.containerManager.stop();
   }
+
+  private void verifyAcls(Map<ApplicationAccessType, String> logAcls) {
+    Assert.assertEquals(this.acls.size(), logAcls.size());
+    for (ApplicationAccessType appAccessType : this.acls.keySet()) {
+      Assert.assertEquals(this.acls.get(appAccessType),
+          logAcls.get(appAccessType));
+    }
+  }
+
+  private DrainDispatcher createDispatcher() {
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(this.conf);
+    dispatcher.start();
+    return dispatcher;
+  }
+  
+  private Map<ApplicationAccessType, String> createAppAcls() {
+    Map<ApplicationAccessType, String> appAcls =
+        new HashMap<ApplicationAccessType, String>();
+    appAcls.put(ApplicationAccessType.MODIFY_APP, "user group");
+    appAcls.put(ApplicationAccessType.VIEW_APP, "*");
+    return appAcls;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java Wed Nov  2 05:34:31 2011
@@ -262,16 +262,17 @@ public class TestContainersMonitor exten
     gcsRequest.setContainerId(cId);
     ContainerStatus containerStatus =
         containerManager.getContainerStatus(gcsRequest).getStatus();
-    Assert.assertEquals(ExitCode.KILLED.getExitCode(),
+    Assert.assertEquals(ExitCode.TERMINATED.getExitCode(),
         containerStatus.getExitStatus());
     String expectedMsgPattern =
         "Container \\[pid=" + pid + ",containerID=" + cId
-            + "\\] is running beyond memory-limits. Current usage : "
-            + "[0-9]*bytes. Limit : [0-9]*"
-            + "bytes. Killing container. \nDump of the process-tree for "
-            + cId + " : \n";
+            + "\\] is running beyond virtual memory limits. Current usage: "
+            + "[0-9.]+m?b of [0-9.]+m?b physical memory used; "
+            + "[0-9.]+m?b of [0-9.]+m?b virtual memory used. "
+            + "Killing container.\nDump of the process-tree for "
+            + cId + " :\n";
     Pattern pat = Pattern.compile(expectedMsgPattern);
-    Assert.assertEquals("Expected message patterns is: " + expectedMsgPattern
+    Assert.assertEquals("Expected message pattern is: " + expectedMsgPattern
         + "\n\nObserved message is: " + containerStatus.getDiagnostics(),
         true, pat.matcher(containerStatus.getDiagnostics()).find());
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java Wed Nov  2 05:34:31 2011
@@ -18,6 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.webapp;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -41,11 +44,11 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Before;
 import org.junit.Test;
-import static org.mockito.Mockito.*;
 
 public class TestNMWebServer {
 
@@ -58,7 +61,7 @@ public class TestNMWebServer {
   }
 
   @Test
-  public void testNMWebApp() throws InterruptedException, IOException {
+  public void testNMWebApp() throws IOException {
     Context nmContext = new NodeManager.NMContext();
     ResourceView resourceView = new ResourceView() {
       @Override
@@ -70,8 +73,9 @@ public class TestNMWebServer {
         return 0;
       }
     };
-    WebServer server = new WebServer(nmContext, resourceView);
     Configuration conf = new Configuration();
+    WebServer server = new WebServer(nmContext, resourceView,
+        new ApplicationACLsManager(conf));
     conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
     server.init(conf);
     server.start();
@@ -88,9 +92,8 @@ public class TestNMWebServer {
     when(app.getUser()).thenReturn(user);
     when(app.getAppId()).thenReturn(appId);
     nmContext.getApplications().put(appId, app);
-    ApplicationAttemptId appAttemptId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
-    appAttemptId.setApplicationId(appId);
-    appAttemptId.setAttemptId(1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 1);
     ContainerId container1 =
         BuilderUtils.newContainerId(recordFactory, appId, appAttemptId, 0);
     ContainerId container2 =
@@ -104,7 +107,7 @@ public class TestNMWebServer {
       launchContext.setContainerId(containerId);
       launchContext.setUser(user);
       Container container =
-          new ContainerImpl(dispatcher, launchContext, null, metrics) {
+          new ContainerImpl(conf, dispatcher, launchContext, null, metrics) {
         @Override
         public ContainerState getContainerState() {
           return ContainerState.RUNNING;

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml Wed Nov  2 05:34:31 2011
@@ -16,15 +16,16 @@
   <parent>
     <artifactId>hadoop-yarn-server</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>${yarn.version}</version>
+    <version>0.24.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+  <version>0.24.0-SNAPSHOT</version>
   <name>hadoop-yarn-server-resourcemanager</name>
 
   <properties>
-    <install.file>${project.artifact.file}</install.file>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
     <yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
   </properties>
 
@@ -33,6 +34,10 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-common</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+    </dependency>
   </dependencies>
 
   <build>



Mime
View raw message