hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1532910 [2/5] - in /hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./ hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java...
Date Wed, 16 Oct 2013 21:07:44 GMT
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java Wed Oct 16 21:07:28 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.http.HttpConfig.Policy;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
@@ -97,8 +98,14 @@ public class WebAppUtils {
   }
   
   public static String getResolvedRMWebAppURLWithoutScheme(Configuration conf) {
+    return getResolvedRMWebAppURLWithoutScheme(conf,
+        HttpConfig.isSecure() ? Policy.HTTPS_ONLY : Policy.HTTP_ONLY);
+  }
+  
+  public static String getResolvedRMWebAppURLWithoutScheme(Configuration conf,
+      Policy httpPolicy) {
     InetSocketAddress address = null;
-    if (HttpConfig.isSecure()) {
+    if (httpPolicy == Policy.HTTPS_ONLY) {
       address =
           conf.getSocketAddr(YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS,
               YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_ADDRESS,
@@ -136,6 +143,21 @@ public class WebAppUtils {
       return conf.get(YarnConfiguration.NM_WEBAPP_ADDRESS,
         YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS);
     }
-
+  }
+  
+  /**
+   * if url has scheme then it will be returned as it is else it will return
+   * url with scheme.
+   * @param schemePrefix eg. http:// or https://
+   * @param url
+   * @return url with scheme
+   */
+  public static String getURLWithScheme(String schemePrefix, String url) {
+    // If scheme is provided then it will be returned as it is
+    if (url.indexOf("://") > 0) {
+      return url;
+    } else {
+      return schemePrefix + url;
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed Oct 16 21:07:28 2013
@@ -100,12 +100,29 @@
   </property>
 
   <property>
-    <description>The address of the RM web application.</description>
+      <description>
+        This configures the HTTP endpoint for Yarn Daemons.The following
+        values are supported:
+        - HTTP_ONLY : Service is provided only on http
+        - HTTPS_ONLY : Service is provided only on https
+      </description>
+      <name>yarn.http.policy</name>
+      <value>HTTP_ONLY</value>
+  </property>
+
+  <property>
+    <description>The http address of the RM web application.</description>
     <name>yarn.resourcemanager.webapp.address</name>
     <value>${yarn.resourcemanager.hostname}:8088</value>
   </property>
 
   <property>
+    <description>The https adddress of the RM web application.</description>
+    <name>yarn.resourcemanager.webapp.https.address</name>
+    <value>${yarn.resourcemanager.hostname}:8090</value>
+  </property>
+
+  <property>
     <name>yarn.resourcemanager.resource-tracker.address</name>
     <value>${yarn.resourcemanager.hostname}:8031</value>
   </property>
@@ -314,14 +331,35 @@
   </property>
 
   <property>
-    <description>Enable RM high-availability. When enabled, the RM starts
-    in the Standby mode by default, and transitions to the Active mode when
-    prompted to.</description>
+    <description>Enable RM high-availability. When enabled,
+      (1) The RM starts in the Standby mode by default, and transitions to
+      the Active mode when prompted to.
+      (2) The nodes in the RM ensemble are listed in
+      yarn.resourcemanager.ha.rm-ids
+      (3) The id of each RM comes from yarn.resourcemanager.ha.id
+      (4) The actual physical addresses come from the configs of the pattern
+      - {rpc-config}.{id}</description>
     <name>yarn.resourcemanager.ha.enabled</name>
     <value>false</value>
   </property>
 
   <property>
+    <description>The list of RM nodes in the cluster when HA is
+      enabled. See description of yarn.resourcemanager.ha
+      .enabled for full details on how this is used.</description>
+    <name>yarn.resourcemanager.ha.rm-ids</name>
+    <!--value>rm1,rm2</value-->
+  </property>
+
+  <property>
+    <description>The id (string) of the current RM. When HA is enabled, this
+      is a required config. See description of yarn.resourcemanager.ha.enabled
+      for full details on how this is used.</description>
+    <name>yarn.resourcemanager.ha.id</name>
+    <!--value>rm1</value-->
+  </property>
+
+  <property>
     <description>The maximum number of completed applications RM keeps. </description>
     <name>yarn.resourcemanager.max-completed-applications</name>
     <value>10000</value>
@@ -701,6 +739,21 @@
   </property>
 
   <property>
+    <description>The UNIX user that containers will run as when Linux-container-executor
+    is used in nonsecure mode (a use case for this is using cgroups).</description>
+    <name>yarn.nodemanager.linux-container-executor.nonsecure-mode.local-user</name>
+    <value>nobody</value>
+  </property>
+
+  <property>
+    <description>The allowed pattern for UNIX user names enforced by
+    Linux-container-executor when used in nonsecure mode (use case for this
+    is using cgroups). The default value is taken from /usr/sbin/adduser</description>
+    <name>yarn.nodemanager.linux-container-executor.nonsecure-mode.user-pattern</name>
+    <value>^[_.A-Za-z0-9][-@_.A-Za-z0-9]{0,255}?[$]?$</value>
+  </property>
+
+  <property>
     <description>T-file compression types used to compress aggregated logs.</description>
     <name>yarn.nodemanager.log-aggregation.compression-type</name>
     <value>none</value>

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestConverterUtils.java Wed Oct 16 21:07:28 2013
@@ -39,6 +39,14 @@ public class TestConverterUtils {
   }
 
   @Test
+  public void testConvertUrlWithUserinfo() throws URISyntaxException {
+    Path expectedPath = new Path("foo://username:password@example.com:8042");
+    URL url = ConverterUtils.getYarnUrlFromPath(expectedPath);
+    Path actualPath = ConverterUtils.getPathFromYarnURL(url);
+    assertEquals(expectedPath, actualPath);
+  }
+  
+  @Test
   public void testContainerId() throws URISyntaxException {
     ContainerId id = TestContainerId.newContainerId(0, 0, 0, 0);
     String cid = ConverterUtils.toString(id);

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java Wed Oct 16 21:07:28 2013
@@ -41,6 +41,7 @@ import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipOutputStream;
+import java.util.zip.GZIPOutputStream;
 
 import junit.framework.Assert;
 
@@ -72,6 +73,9 @@ public class TestFSDownload {
   private static final Log LOG = LogFactory.getLog(TestFSDownload.class);
   private static AtomicLong uniqueNumberGenerator =
     new AtomicLong(System.currentTimeMillis());
+  private enum TEST_FILE_TYPE {
+    TAR, JAR, ZIP, TGZ
+  };
   
   @AfterClass
   public static void deleteTestDir() throws IOException {
@@ -121,7 +125,7 @@ public class TestFSDownload {
     ret.setPattern("classes/.*");
     return ret;
   }
-  
+
   static LocalResource createTarFile(FileContext files, Path p, int len,
       Random r, LocalResourceVisibility vis) throws IOException,
       URISyntaxException {
@@ -149,7 +153,35 @@ public class TestFSDownload {
         .getModificationTime());
     return ret;
   }
-  
+
+  static LocalResource createTgzFile(FileContext files, Path p, int len,
+      Random r, LocalResourceVisibility vis) throws IOException,
+      URISyntaxException {
+    byte[] bytes = new byte[len];
+    r.nextBytes(bytes);
+
+    File gzipFile = new File(p.toUri().getPath() + ".tar.gz");
+    gzipFile.createNewFile();
+    TarArchiveOutputStream out = new TarArchiveOutputStream(
+        new GZIPOutputStream(new FileOutputStream(gzipFile)));
+    TarArchiveEntry entry = new TarArchiveEntry(p.getName());
+    entry.setSize(bytes.length);
+    out.putArchiveEntry(entry);
+    out.write(bytes);
+    out.closeArchiveEntry();
+    out.close();
+
+    LocalResource ret = recordFactory.newRecordInstance(LocalResource.class);
+    ret.setResource(ConverterUtils.getYarnUrlFromPath(new Path(p.toString()
+        + ".tar.gz")));
+    ret.setSize(len);
+    ret.setType(LocalResourceType.ARCHIVE);
+    ret.setVisibility(vis);
+    ret.setTimestamp(files.getFileStatus(new Path(p.toString() + ".tar.gz"))
+        .getModificationTime());
+    return ret;
+  }
+
   static LocalResource createJarFile(FileContext files, Path p, int len,
       Random r, LocalResourceVisibility vis) throws IOException,
       URISyntaxException {
@@ -175,7 +207,7 @@ public class TestFSDownload {
         .getModificationTime());
     return ret;
   }
-  
+
   static LocalResource createZipFile(FileContext files, Path p, int len,
       Random r, LocalResourceVisibility vis) throws IOException,
       URISyntaxException {
@@ -201,7 +233,7 @@ public class TestFSDownload {
         .getModificationTime());
     return ret;
   }
-  
+
   @Test (timeout=10000)
   public void testDownloadBadPublic() throws IOException, URISyntaxException,
       InterruptedException {
@@ -252,7 +284,7 @@ public class TestFSDownload {
       Assert.assertTrue(e.getCause() instanceof IOException);
     }
   }
-  
+
   @Test (timeout=10000)
   public void testDownload() throws IOException, URISyntaxException,
       InterruptedException {
@@ -326,10 +358,9 @@ public class TestFSDownload {
       throw new IOException("Failed exec", e);
     }
   }
-  
-  @Test (timeout=10000) 
-  public void testDownloadArchive() throws IOException, URISyntaxException,
-      InterruptedException {
+
+  private void downloadWithFileType(TEST_FILE_TYPE fileType) throws IOException, 
+      URISyntaxException, InterruptedException{
     Configuration conf = new Configuration();
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
     FileContext files = FileContext.getLocalFSFileContext(conf);
@@ -352,7 +383,22 @@ public class TestFSDownload {
     LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
 
     Path p = new Path(basedir, "" + 1);
-    LocalResource rsrc = createTarFile(files, p, size, rand, vis);
+    LocalResource rsrc = null;
+    switch (fileType) {
+    case TAR:
+      rsrc = createTarFile(files, p, size, rand, vis);
+      break;
+    case JAR:
+      rsrc = createJarFile(files, p, size, rand, vis);
+      rsrc.setType(LocalResourceType.PATTERN);
+      break;
+    case ZIP:
+      rsrc = createZipFile(files, p, size, rand, vis);
+      break;
+    case TGZ:
+      rsrc = createTgzFile(files, p, size, rand, vis);
+      break;
+    }
     Path destPath = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
     destPath = new Path (destPath,
         Long.toString(uniqueNumberGenerator.incrementAndGet()));
@@ -371,7 +417,7 @@ public class TestFSDownload {
           FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
               filestatus.getPath());
           for (FileStatus childfile : childFiles) {
-            if (childfile.getPath().getName().equalsIgnoreCase("1.tar.tmp")) {
+            if (childfile.getPath().getName().startsWith("tmp")) {
               Assert.fail("Tmp File should not have been there "
                   + childfile.getPath());
             }
@@ -384,118 +430,29 @@ public class TestFSDownload {
   }
 
   @Test (timeout=10000) 
-  public void testDownloadPatternJar() throws IOException, URISyntaxException,
+  public void testDownloadArchive() throws IOException, URISyntaxException,
       InterruptedException {
-    Configuration conf = new Configuration();
-    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
-    FileContext files = FileContext.getLocalFSFileContext(conf);
-    final Path basedir = files.makeQualified(new Path("target",
-        TestFSDownload.class.getSimpleName()));
-    files.mkdir(basedir, null, true);
-    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
-
-    Random rand = new Random();
-    long sharedSeed = rand.nextLong();
-    rand.setSeed(sharedSeed);
-    System.out.println("SEED: " + sharedSeed);
-
-    Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
-    ExecutorService exec = Executors.newSingleThreadExecutor();
-    LocalDirAllocator dirs = new LocalDirAllocator(
-        TestFSDownload.class.getName());
-
-    int size = rand.nextInt(512) + 512;
-    LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
-
-    Path p = new Path(basedir, "" + 1);
-    LocalResource rsrcjar = createJarFile(files, p, size, rand, vis);
-    rsrcjar.setType(LocalResourceType.PATTERN);
-    Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
-    destPathjar = new Path (destPathjar,
-        Long.toString(uniqueNumberGenerator.incrementAndGet()));
-    FSDownload fsdjar = new FSDownload(files,
-        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrcjar);
-    pending.put(rsrcjar, exec.submit(fsdjar));
-    exec.shutdown();
-    while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
-    Assert.assertTrue(pending.get(rsrcjar).isDone());
+    downloadWithFileType(TEST_FILE_TYPE.TAR);
+  }
 
-    try {
-      FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
-          basedir);
-      for (FileStatus filestatus : filesstatus) {
-        if (filestatus.isDirectory()) {
-          FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
-              filestatus.getPath());
-          for (FileStatus childfile : childFiles) {
-            if (childfile.getPath().getName().equalsIgnoreCase("1.jar.tmp")) {
-              Assert.fail("Tmp File should not have been there "
-                  + childfile.getPath());
-            }
-          }
-        }
-      }
-    }catch (Exception e) {
-      throw new IOException("Failed exec", e);
-    }
+  @Test (timeout=10000)
+  public void testDownloadPatternJar() throws IOException, URISyntaxException,
+      InterruptedException {
+    downloadWithFileType(TEST_FILE_TYPE.JAR);
   }
-  
+
   @Test (timeout=10000) 
   public void testDownloadArchiveZip() throws IOException, URISyntaxException,
       InterruptedException {
-    Configuration conf = new Configuration();
-    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
-    FileContext files = FileContext.getLocalFSFileContext(conf);
-    final Path basedir = files.makeQualified(new Path("target",
-        TestFSDownload.class.getSimpleName()));
-    files.mkdir(basedir, null, true);
-    conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
-
-    Random rand = new Random();
-    long sharedSeed = rand.nextLong();
-    rand.setSeed(sharedSeed);
-    System.out.println("SEED: " + sharedSeed);
-
-    Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
-    ExecutorService exec = Executors.newSingleThreadExecutor();
-    LocalDirAllocator dirs = new LocalDirAllocator(
-        TestFSDownload.class.getName());
-
-    int size = rand.nextInt(512) + 512;
-    LocalResourceVisibility vis = LocalResourceVisibility.PRIVATE;
-
-    Path p = new Path(basedir, "" + 1);
-    LocalResource rsrczip = createZipFile(files, p, size, rand, vis);
-    Path destPathjar = dirs.getLocalPathForWrite(basedir.toString(), size, conf);
-    destPathjar = new Path (destPathjar,
-        Long.toString(uniqueNumberGenerator.incrementAndGet()));
-    FSDownload fsdzip = new FSDownload(files,
-        UserGroupInformation.getCurrentUser(), conf, destPathjar, rsrczip);
-    pending.put(rsrczip, exec.submit(fsdzip));
-    exec.shutdown();
-    while (!exec.awaitTermination(1000, TimeUnit.MILLISECONDS));
-    Assert.assertTrue(pending.get(rsrczip).isDone());
+    downloadWithFileType(TEST_FILE_TYPE.ZIP);
+  }
 
-    try {
-      FileStatus[] filesstatus = files.getDefaultFileSystem().listStatus(
-          basedir);
-      for (FileStatus filestatus : filesstatus) {
-        if (filestatus.isDirectory()) {
-          FileStatus[] childFiles = files.getDefaultFileSystem().listStatus(
-              filestatus.getPath());
-          for (FileStatus childfile : childFiles) {
-            if (childfile.getPath().getName().equalsIgnoreCase("1.gz.tmp")) {
-              Assert.fail("Tmp File should not have been there "
-                  + childfile.getPath());
-            }
-          }
-        }
-      }
-    }catch (Exception e) {
-      throw new IOException("Failed exec", e);
-    }
+  @Test (timeout=10000)
+  public void testDownloadArchiveTgz() throws IOException, URISyntaxException,
+      InterruptedException {
+    downloadWithFileType(TEST_FILE_TYPE.TGZ);
   }
-  
+
   private void verifyPermsRecursively(FileSystem fs,
       FileContext files, Path p,
       LocalResourceVisibility vis) throws IOException {
@@ -527,7 +484,7 @@ public class TestFSDownload {
       }
     }      
   }
-  
+
   @Test (timeout=10000)
   public void testDirDownload() throws IOException, InterruptedException {
     Configuration conf = new Configuration();

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestRackResolver.java Wed Oct 16 21:07:28 2013
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.junit.Assert;
 import org.junit.Test;
@@ -35,6 +36,8 @@ import org.junit.Test;
 public class TestRackResolver {
 
   private static Log LOG = LogFactory.getLog(TestRackResolver.class);
+  private static final String invalidHost = "invalidHost";
+
 
   public static final class MyResolver implements DNSToSwitchMapping {
 
@@ -50,6 +53,11 @@ public class TestRackResolver {
       if (hostList.isEmpty()) {
         return returnList;
       }
+      if (hostList.get(0).equals(invalidHost)) {
+        // Simulate condition where resolving host returns null
+        return null; 
+      }
+        
       LOG.info("Received resolve request for "
           + hostList.get(0));
       if (hostList.get(0).equals("host1")
@@ -90,6 +98,8 @@ public class TestRackResolver {
     Assert.assertEquals("/rack1", node.getNetworkLocation());
     node = RackResolver.resolve("host1");
     Assert.assertEquals("/rack1", node.getNetworkLocation());
+    node = RackResolver.resolve(invalidHost);
+    Assert.assertEquals(NetworkTopology.DEFAULT_RACK, node.getNetworkLocation());
   }
 
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java Wed Oct 16 21:07:28 2013
@@ -31,13 +31,17 @@ public class ServerRMProxy<T> extends RM
 
   private static final Log LOG = LogFactory.getLog(ServerRMProxy.class);
 
-  public static <T> T createRMProxy(final Configuration conf,
+  public static <T> T createRMProxy(final Configuration configuration,
       final Class<T> protocol) throws IOException {
+    YarnConfiguration conf = (configuration instanceof YarnConfiguration)
+        ? (YarnConfiguration) configuration
+        : new YarnConfiguration(configuration);
     InetSocketAddress rmAddress = getRMAddress(conf, protocol);
     return createRMProxy(conf, protocol, rmAddress);
   }
 
-  private static InetSocketAddress getRMAddress(Configuration conf, Class<?> protocol) {
+  private static InetSocketAddress getRMAddress(YarnConfiguration conf,
+                                                Class<?> protocol) {
     if (protocol == ResourceTracker.class) {
       return conf.getSocketAddr(
         YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedAppsEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedAppsEvent.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedAppsEvent.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedAppsEvent.java Wed Oct 16 21:07:28 2013
@@ -27,13 +27,31 @@ import org.apache.hadoop.yarn.server.nod
 public class CMgrCompletedAppsEvent extends ContainerManagerEvent {
 
   private final List<ApplicationId> appsToCleanup;
+  private final Reason reason;
 
-  public CMgrCompletedAppsEvent(List<ApplicationId> appsToCleanup) {
+  public CMgrCompletedAppsEvent(List<ApplicationId> appsToCleanup, Reason reason) {
     super(ContainerManagerEventType.FINISH_APPS);
     this.appsToCleanup = appsToCleanup;
+    this.reason = reason;
   }
 
   public List<ApplicationId> getAppsToCleanup() {
     return this.appsToCleanup;
   }
+
+  public Reason getReason() {
+    return reason;
+  }
+
+  public static enum Reason {
+    /**
+     * Application is killed as NodeManager is shut down
+     */
+    ON_SHUTDOWN, 
+
+    /**
+     * Application is killed by ResourceManager
+     */
+    BY_RESOURCEMANAGER
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/CMgrCompletedContainersEvent.java Wed Oct 16 21:07:28 2013
@@ -24,10 +24,11 @@ import org.apache.hadoop.yarn.api.record
 
 public class CMgrCompletedContainersEvent extends ContainerManagerEvent {
 
-  private List<ContainerId> containerToCleanup;
-  private Reason reason;
-  
-  public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup, Reason reason) {
+  private final List<ContainerId> containerToCleanup;
+  private final Reason reason;
+
+  public CMgrCompletedContainersEvent(List<ContainerId> containersToCleanup,
+                                      Reason reason) {
     super(ContainerManagerEventType.FINISH_CONTAINERS);
     this.containerToCleanup = containersToCleanup;
     this.reason = reason;
@@ -36,12 +37,27 @@ public class CMgrCompletedContainersEven
   public List<ContainerId> getContainersToCleanup() {
     return this.containerToCleanup;
   }
-  
+
   public Reason getReason() {
     return reason;
   }
-  
+
   public static enum Reason {
-    ON_SHUTDOWN, BY_RESOURCEMANAGER
+    /**
+     * Container is killed as NodeManager is shutting down
+     */
+    ON_SHUTDOWN,
+
+    /**
+     * Container is killed as the Nodemanager is re-syncing with the
+     * ResourceManager
+     */
+    ON_NODEMANAGER_RESYNC,
+
+    /**
+     * Container is killed on request by the ResourceManager
+     */
+    BY_RESOURCEMANAGER
   }
+
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/ContainerExecutor.java Wed Oct 16 21:07:28 2013
@@ -218,7 +218,7 @@ public abstract class ContainerExecutor 
         retCommand.addAll(Arrays.asList("nice", "-n",
             Integer.toString(containerSchedPriorityAdjustment)));
       }
-      retCommand.addAll(Arrays.asList("bash", "-c", command));
+      retCommand.addAll(Arrays.asList("bash", command));
       return retCommand.toArray(new String[retCommand.size()]);
     }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DefaultContainerExecutor.java Wed Oct 16 21:07:28 2013
@@ -277,7 +277,7 @@ public class DefaultContainerExecutor ex
       pout.println("echo $$ > " + pidFile.toString() + ".tmp");
       pout.println("/bin/mv -f " + pidFile.toString() + ".tmp " + pidFile);
       String exec = Shell.isSetsidAvailable? "exec setsid" : "exec";
-      pout.println(exec + " /bin/bash -c \"" +
+      pout.println(exec + " /bin/bash \"" +
         launchDst.toUri().getPath().toString() + "\"");
     }
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LinuxContainerExecutor.java Wed Oct 16 21:07:28 2013
@@ -24,11 +24,13 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -48,6 +50,8 @@ public class LinuxContainerExecutor exte
   private static final Log LOG = LogFactory
       .getLog(LinuxContainerExecutor.class);
 
+  private String nonsecureLocalUser;
+  private Pattern nonsecureLocalUserPattern;
   private String containerExecutorExe;
   private LCEResourcesHandler resourcesHandler;
   private boolean containerSchedPriorityIsSet = false;
@@ -70,6 +74,24 @@ public class LinuxContainerExecutor exte
           .getInt(YarnConfiguration.NM_CONTAINER_EXECUTOR_SCHED_PRIORITY, 
           YarnConfiguration.DEFAULT_NM_CONTAINER_EXECUTOR_SCHED_PRIORITY);
     }
+    nonsecureLocalUser = conf.get(
+        YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
+        YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
+    nonsecureLocalUserPattern = Pattern.compile(
+        conf.get(YarnConfiguration.NM_NONSECURE_MODE_USER_PATTERN_KEY,
+            YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_USER_PATTERN));        
+  }
+
+  void verifyUsernamePattern(String user) {
+    if (!UserGroupInformation.isSecurityEnabled() &&
+        !nonsecureLocalUserPattern.matcher(user).matches()) {
+        throw new IllegalArgumentException("Invalid user name '" + user + "'," +
+            " it must match '" + nonsecureLocalUserPattern.pattern() + "'");
+      }
+  }
+
+  String getRunAsUser(String user) {
+    return UserGroupInformation.isSecurityEnabled() ? user : nonsecureLocalUser;
   }
 
   /**
@@ -162,9 +184,12 @@ public class LinuxContainerExecutor exte
       List<String> localDirs, List<String> logDirs)
       throws IOException, InterruptedException {
 
+    verifyUsernamePattern(user);
+    String runAsUser = getRunAsUser(user);
     List<String> command = new ArrayList<String>();
     addSchedPriorityCommand(command);
     command.addAll(Arrays.asList(containerExecutorExe,
+                   runAsUser,
                    user, 
                    Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
                    appId,
@@ -218,6 +243,9 @@ public class LinuxContainerExecutor exte
       String user, String appId, Path containerWorkDir,
       List<String> localDirs, List<String> logDirs) throws IOException {
 
+    verifyUsernamePattern(user);
+    String runAsUser = getRunAsUser(user);
+
     ContainerId containerId = container.getContainerId();
     String containerIdStr = ConverterUtils.toString(containerId);
     
@@ -234,7 +262,7 @@ public class LinuxContainerExecutor exte
         List<String> command = new ArrayList<String>();
         addSchedPriorityCommand(command);
         command.addAll(Arrays.asList(
-            containerExecutorExe, user, Integer
+            containerExecutorExe, runAsUser, user, Integer
                 .toString(Commands.LAUNCH_CONTAINER.getValue()), appId,
             containerIdStr, containerWorkDir.toString(),
             nmPrivateCotainerScriptPath.toUri().getPath().toString(),
@@ -293,8 +321,12 @@ public class LinuxContainerExecutor exte
   public boolean signalContainer(String user, String pid, Signal signal)
       throws IOException {
 
+    verifyUsernamePattern(user);
+    String runAsUser = getRunAsUser(user);
+
     String[] command =
         new String[] { containerExecutorExe,
+                   runAsUser,
                    user,
                    Integer.toString(Commands.SIGNAL_CONTAINER.getValue()),
                    pid,
@@ -322,8 +354,12 @@ public class LinuxContainerExecutor exte
 
   @Override
   public void deleteAsUser(String user, Path dir, Path... baseDirs) {
+    verifyUsernamePattern(user);
+    String runAsUser = getRunAsUser(user);
+
     List<String> command = new ArrayList<String>(
         Arrays.asList(containerExecutorExe,
+                    runAsUser,
                     user,
                     Integer.toString(Commands.DELETE_AS_USER.getValue()),
                     dir == null ? "" : dir.toUri().getPath()));

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed Oct 16 21:07:28 2013
@@ -19,9 +19,6 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -31,6 +28,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.http.HttpConfig.Policy;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.service.CompositeService;
@@ -67,11 +66,6 @@ public class NodeManager extends Composi
    * Priority of the NodeManager shutdown hook.
    */
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
-  
-  /**
-   * Extra duration to wait for containers to be killed on shutdown.
-   */
-  private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
 
   private static final Log LOG = LogFactory.getLog(NodeManager.class);
   protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
@@ -84,8 +78,6 @@ public class NodeManager extends Composi
   private NodeStatusUpdater nodeStatusUpdater;
   private static CompositeServiceShutdownHook nodeManagerShutdownHook; 
   
-  private long waitForContainersOnShutdownMillis;
-  
   private AtomicBoolean isStopping = new AtomicBoolean(false);
   
   public NodeManager() {
@@ -193,13 +185,6 @@ public class NodeManager extends Composi
     // so that we make sure everything is up before registering with RM. 
     addService(nodeStatusUpdater);
     
-    waitForContainersOnShutdownMillis =
-        conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
-            YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) + 
-        conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
-            YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
-        SHUTDOWN_CLEANUP_SLOP_MS;
-    
     super.serviceInit(conf);
     // TODO add local dirs to del
   }
@@ -219,9 +204,6 @@ public class NodeManager extends Composi
     if (isStopping.getAndSet(true)) {
       return;
     }
-    if (context != null) {
-      cleanupContainers(NodeManagerEventType.SHUTDOWN);
-    }
     super.serviceStop();
     DefaultMetricsSystem.shutdown();
   }
@@ -246,68 +228,13 @@ public class NodeManager extends Composi
       public void run() {
         LOG.info("Notifying ContainerManager to block new container-requests");
         containerManager.setBlockNewContainerRequests(true);
-        cleanupContainers(NodeManagerEventType.RESYNC);
+        LOG.info("Cleaning up running containers on resync");
+        containerManager.cleanupContainersOnNMResync();
         ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
       }
     }.start();
   }
 
-  @SuppressWarnings("unchecked")
-  protected void cleanupContainers(NodeManagerEventType eventType) {
-    Map<ContainerId, Container> containers = context.getContainers();
-    if (containers.isEmpty()) {
-      return;
-    }
-    LOG.info("Containers still running on " + eventType + " : "
-        + containers.keySet());
-    
-    List<ContainerId> containerIds =
-        new ArrayList<ContainerId>(containers.keySet());
-    dispatcher.getEventHandler().handle(
-        new CMgrCompletedContainersEvent(containerIds, 
-            CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
-    
-    LOG.info("Waiting for containers to be killed");
-    
-    switch (eventType) {
-    case SHUTDOWN:
-      long waitStartTime = System.currentTimeMillis();
-      while (!containers.isEmpty()
-          && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
-        try {
-          //To remove done containers in NM context
-          nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
-          Thread.sleep(1000);
-        } catch (InterruptedException ex) {
-          LOG.warn("Interrupted while sleeping on container kill on shutdown",
-            ex);
-        }
-      }
-      break;
-    case RESYNC:
-      while (!containers.isEmpty()) {
-        try {
-          Thread.sleep(1000);
-          nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
-        } catch (InterruptedException ex) {
-          LOG.warn("Interrupted while sleeping on container kill on resync",
-            ex);
-        }
-      }
-      break;
-    default:
-      LOG.warn("Invalid eventType: " + eventType);
-    }
-
-    // All containers killed
-    if (containers.isEmpty()) {
-      LOG.info("All containers in DONE state");
-    } else {
-      LOG.info("Done waiting for containers to be killed. Still alive: " + 
-          containers.keySet());
-    }
-  }
-
   public static class NMContext implements Context {
 
     private NodeId nodeId = null;
@@ -470,9 +397,16 @@ public class NodeManager extends Composi
     StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
     NodeManager nodeManager = new NodeManager();
     Configuration conf = new YarnConfiguration();
+    setHttpPolicy(conf);
     nodeManager.initAndStartNodeManager(conf, false);
   }
   
+  private static void setHttpPolicy(Configuration conf) {
+    HttpConfig.setPolicy(Policy.fromString(conf.get(
+      YarnConfiguration.YARN_HTTP_POLICY_KEY,
+      YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
+  }
+
   @VisibleForTesting
   @Private
   public NodeStatusUpdater getNodeStatusUpdater() {

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Oct 16 21:07:28 2013
@@ -499,18 +499,19 @@ public class NodeStatusUpdaterImpl exten
             lastHeartBeatID = response.getResponseId();
             List<ContainerId> containersToCleanup = response
                 .getContainersToCleanup();
-            if (containersToCleanup.size() != 0) {
+            if (!containersToCleanup.isEmpty()) {
               dispatcher.getEventHandler().handle(
-                  new CMgrCompletedContainersEvent(containersToCleanup, 
-                      CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
+                  new CMgrCompletedContainersEvent(containersToCleanup,
+                    CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
             }
             List<ApplicationId> appsToCleanup =
                 response.getApplicationsToCleanup();
             //Only start tracking for keepAlive on FINISH_APP
             trackAppsForKeepAlive(appsToCleanup);
-            if (appsToCleanup.size() != 0) {
+            if (!appsToCleanup.isEmpty()) {
               dispatcher.getEventHandler().handle(
-                  new CMgrCompletedAppsEvent(appsToCleanup));
+                  new CMgrCompletedAppsEvent(appsToCleanup,
+                      CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
             }
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Wed Oct 16 21:07:28 2013
@@ -175,39 +175,56 @@ public class AuxServices extends Abstrac
     LOG.info("Got event " + event.getType() + " for appId "
         + event.getApplicationID());
     switch (event.getType()) {
-    case APPLICATION_INIT:
-      LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
-      AuxiliaryService service = serviceMap.get(event.getServiceID());
-      if (null == service) {
-        LOG.info("service is null");
-        // TODO kill all containers waiting on Application
-        return;
-      }
-      service.initializeApplication(new ApplicationInitializationContext(event
-        .getUser(), event.getApplicationID(), event.getServiceData()));
-      break;
-    case APPLICATION_STOP:
-      for (AuxiliaryService serv : serviceMap.values()) {
-        serv.stopApplication(new ApplicationTerminationContext(event
-          .getApplicationID()));
-      }
-      break;
-    case CONTAINER_INIT:
-      for (AuxiliaryService serv : serviceMap.values()) {
-        serv.initializeContainer(new ContainerInitializationContext(
-            event.getUser(), event.getContainer().getContainerId(),
-            event.getContainer().getResource()));
-      }
-      break;
-    case CONTAINER_STOP:
-      for (AuxiliaryService serv : serviceMap.values()) {
-        serv.stopContainer(new ContainerTerminationContext(
-            event.getUser(), event.getContainer().getContainerId(),
-            event.getContainer().getResource()));
-      }
-      break;
+      case APPLICATION_INIT:
+        LOG.info("Got APPLICATION_INIT for service " + event.getServiceID());
+        AuxiliaryService service = null;
+        try {
+          service = serviceMap.get(event.getServiceID());
+          service
+              .initializeApplication(new ApplicationInitializationContext(event
+                  .getUser(), event.getApplicationID(), event.getServiceData()));
+        } catch (Throwable th) {
+          logWarningWhenAuxServiceThrowExceptions(service,
+              AuxServicesEventType.APPLICATION_INIT, th);
+        }
+        break;
+      case APPLICATION_STOP:
+        for (AuxiliaryService serv : serviceMap.values()) {
+          try {
+            serv.stopApplication(new ApplicationTerminationContext(event
+                .getApplicationID()));
+          } catch (Throwable th) {
+            logWarningWhenAuxServiceThrowExceptions(serv,
+                AuxServicesEventType.APPLICATION_STOP, th);
+          }
+        }
+        break;
+      case CONTAINER_INIT:
+        for (AuxiliaryService serv : serviceMap.values()) {
+          try {
+            serv.initializeContainer(new ContainerInitializationContext(
+                event.getUser(), event.getContainer().getContainerId(),
+                event.getContainer().getResource()));
+          } catch (Throwable th) {
+            logWarningWhenAuxServiceThrowExceptions(serv,
+                AuxServicesEventType.CONTAINER_INIT, th);
+          }
+        }
+        break;
+      case CONTAINER_STOP:
+        for (AuxiliaryService serv : serviceMap.values()) {
+          try {
+            serv.stopContainer(new ContainerTerminationContext(
+                event.getUser(), event.getContainer().getContainerId(),
+                event.getContainer().getResource()));
+          } catch (Throwable th) {
+            logWarningWhenAuxServiceThrowExceptions(serv,
+                AuxServicesEventType.CONTAINER_STOP, th);
+          }
+        }
+        break;
       default:
-      throw new RuntimeException("Unknown type: " + event.getType());
+        throw new RuntimeException("Unknown type: " + event.getType());
     }
   }
 
@@ -217,4 +234,11 @@ public class AuxServices extends Abstrac
     }
     return p.matcher(name).matches();
   }
+
+  private void logWarningWhenAuxServiceThrowExceptions(AuxiliaryService service,
+      AuxServicesEventType eventType, Throwable th) {
+    LOG.warn((null == service ? "The auxService is null"
+        : "The auxService name is " + service.getName())
+        + " and it got an error at event: " + eventType, th);
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Oct 16 21:07:28 2013
@@ -30,6 +30,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,9 +70,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
 import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -119,6 +124,11 @@ public class ContainerManagerImpl extend
     ServiceStateChangeListener, ContainerManagementProtocol,
     EventHandler<ContainerManagerEvent> {
 
+  /**
+   * Extra duration to wait for applications to be killed on shutdown.
+   */
+  private static final int SHUTDOWN_CLEANUP_SLOP_MS = 1000;
+
   private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
 
   final Context context;
@@ -137,6 +147,11 @@ public class ContainerManagerImpl extend
 
   private final DeletionService deletionService;
   private AtomicBoolean blockNewContainerRequests = new AtomicBoolean(false);
+  private boolean serviceStopped = false;
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+
+  private long waitForContainersOnShutdownMillis;
 
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@@ -180,6 +195,10 @@ public class ContainerManagerImpl extend
     dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
     
     addService(dispatcher);
+
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.readLock = lock.readLock();
+    this.writeLock = lock.writeLock();
   }
 
   @Override
@@ -189,6 +208,13 @@ public class ContainerManagerImpl extend
     addIfService(logHandler);
     dispatcher.register(LogHandlerEventType.class, logHandler);
     
+    waitForContainersOnShutdownMillis =
+        conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
+            YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
+        conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
+            YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
+        SHUTDOWN_CLEANUP_SLOP_MS;
+
     super.serviceInit(conf);
   }
 
@@ -274,6 +300,16 @@ public class ContainerManagerImpl extend
 
   @Override
   public void serviceStop() throws Exception {
+    setBlockNewContainerRequests(true);
+    this.writeLock.lock();
+    try {
+      serviceStopped = true;
+      if (context != null) {
+        cleanUpApplicationsOnNMShutDown();
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
     if (auxiliaryServices.getServiceState() == STARTED) {
       auxiliaryServices.unregisterServiceListener(this);
     }
@@ -283,6 +319,76 @@ public class ContainerManagerImpl extend
     super.serviceStop();
   }
 
+  public void cleanUpApplicationsOnNMShutDown() {
+    Map<ApplicationId, Application> applications =
+        this.context.getApplications();
+    if (applications.isEmpty()) {
+      return;
+    }
+    LOG.info("Applications still running : " + applications.keySet());
+
+    List<ApplicationId> appIds =
+        new ArrayList<ApplicationId>(applications.keySet());
+    this.handle(
+        new CMgrCompletedAppsEvent(appIds,
+            CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
+
+    LOG.info("Waiting for Applications to be Finished");
+
+    long waitStartTime = System.currentTimeMillis();
+    while (!applications.isEmpty()
+        && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ex) {
+        LOG.warn(
+          "Interrupted while sleeping on applications finish on shutdown", ex);
+      }
+    }
+
+    // All applications Finished
+    if (applications.isEmpty()) {
+      LOG.info("All applications in FINISHED state");
+    } else {
+      LOG.info("Done waiting for Applications to be Finished. Still alive: " +
+          applications.keySet());
+    }
+  }
+
+  public void cleanupContainersOnNMResync() {
+    Map<ContainerId, Container> containers = context.getContainers();
+    if (containers.isEmpty()) {
+      return;
+    }
+    LOG.info("Containers still running on "
+        + CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC + " : "
+        + containers.keySet());
+
+    List<ContainerId> containerIds =
+      new ArrayList<ContainerId>(containers.keySet());
+
+    LOG.info("Waiting for containers to be killed");
+
+    this.handle(new CMgrCompletedContainersEvent(containerIds,
+      CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC));
+    while (!containers.isEmpty()) {
+      try {
+        Thread.sleep(1000);
+        nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
+      } catch (InterruptedException ex) {
+        LOG.warn("Interrupted while sleeping on container kill on resync", ex);
+      }
+    }
+
+    // All containers killed
+    if (containers.isEmpty()) {
+      LOG.info("All containers in DONE state");
+    } else {
+      LOG.info("Done waiting for containers to be killed. Still alive: " +
+        containers.keySet());
+    }
+  }
+
   // Get the remoteUGI corresponding to the api call.
   protected UserGroupInformation getRemoteUgi()
       throws YarnException {
@@ -451,6 +557,18 @@ public class ContainerManagerImpl extend
 
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
 
+    Map<String, ByteBuffer> serviceData = getAuxServiceMetaData();
+    if (launchContext.getServiceData()!=null && 
+        !launchContext.getServiceData().isEmpty()) {
+      for (Map.Entry<String, ByteBuffer> meta : launchContext.getServiceData()
+          .entrySet()) {
+        if (null == serviceData.get(meta.getKey())) {
+          throw new InvalidAuxServiceException("The auxService:" + meta.getKey()
+              + " does not exist");
+        }
+      }
+    }
+
     Credentials credentials = parseCredentials(launchContext);
 
     Container container =
@@ -466,29 +584,40 @@ public class ContainerManagerImpl extend
           + " already is running on this node!!");
     }
 
-    // Create the application
-    Application application =
-        new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
-    if (null == context.getApplications().putIfAbsent(applicationID,
-      application)) {
-      LOG.info("Creating a new application reference for app " + applicationID);
-
-      dispatcher.getEventHandler().handle(
-        new ApplicationInitEvent(applicationID, container.getLaunchContext()
-          .getApplicationACLs()));
-    }
+    this.readLock.lock();
+    try {
+      if (!serviceStopped) {
+        // Create the application
+        Application application =
+            new ApplicationImpl(dispatcher, user, applicationID, credentials, context);
+        if (null == context.getApplications().putIfAbsent(applicationID,
+          application)) {
+          LOG.info("Creating a new application reference for app " + applicationID);
+
+          dispatcher.getEventHandler().handle(
+            new ApplicationInitEvent(applicationID, container.getLaunchContext()
+              .getApplicationACLs()));
+        }
 
-    dispatcher.getEventHandler().handle(
-      new ApplicationContainerInitEvent(container));
+        dispatcher.getEventHandler().handle(
+          new ApplicationContainerInitEvent(container));
 
-    this.context.getContainerTokenSecretManager().startContainerSuccessful(
-      containerTokenIdentifier);
-    NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
-      "ContainerManageImpl", applicationID, containerId);
-    // TODO launchedContainer misplaced -> doesn't necessarily mean a container
-    // launch. A finished Application will not launch containers.
-    metrics.launchedContainer();
-    metrics.allocateContainer(containerTokenIdentifier.getResource()); 
+        this.context.getContainerTokenSecretManager().startContainerSuccessful(
+          containerTokenIdentifier);
+        NMAuditLogger.logSuccess(user, AuditConstants.START_CONTAINER,
+          "ContainerManageImpl", applicationID, containerId);
+        // TODO launchedContainer misplaced -> doesn't necessarily mean a container
+        // launch. A finished Application will not launch containers.
+        metrics.launchedContainer();
+        metrics.allocateContainer(containerTokenIdentifier.getResource());
+      } else {
+        throw new YarnException(
+            "Container start failed as the NodeManager is " +
+            "in the process of shutting down");
+      }
+    } finally {
+      this.readLock.unlock();
+    }
   }
 
   protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
@@ -713,9 +842,15 @@ public class ContainerManagerImpl extend
       CMgrCompletedAppsEvent appsFinishedEvent =
           (CMgrCompletedAppsEvent) event;
       for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
+        String diagnostic = "";
+        if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN) {
+          diagnostic = "Application killed on shutdown";
+        } else if (appsFinishedEvent.getReason() == CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER) {
+          diagnostic = "Application killed by ResourceManager";
+        }
         this.dispatcher.getEventHandler().handle(
             new ApplicationFinishEvent(appID,
-                "Application Killed by ResourceManager"));
+                diagnostic));
       }
       break;
     case FINISH_CONTAINERS:
@@ -723,20 +858,14 @@ public class ContainerManagerImpl extend
           (CMgrCompletedContainersEvent) event;
       for (ContainerId container : containersFinishedEvent
           .getContainersToCleanup()) {
-        String diagnostic = "";
-        if (containersFinishedEvent.getReason() == 
-            CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN) {
-          diagnostic = "Container Killed on Shutdown";
-        } else if (containersFinishedEvent.getReason() == 
-            CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER) {
-          diagnostic = "Container Killed by ResourceManager";
-        }
-        this.dispatcher.getEventHandler().handle(
-            new ContainerKillEvent(container, diagnostic));
+          this.dispatcher.getEventHandler().handle(
+              new ContainerKillEvent(container,
+                  "Container Killed by ResourceManager"));
       }
       break;
     default:
-      LOG.warn("Invalid event " + event.getType() + ". Ignoring.");
+        throw new YarnRuntimeException(
+            "Got an unknown ContainerManagerEvent type: " + event.getType());
     }
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Wed Oct 16 21:07:28 2013
@@ -177,6 +177,13 @@ public class ApplicationImpl implements 
                    ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
                ApplicationEventType.APPLICATION_CONTAINER_FINISHED,
                new AppFinishTransition())
+          .addTransition(ApplicationState.FINISHING_CONTAINERS_WAIT,
+              ApplicationState.FINISHING_CONTAINERS_WAIT,
+              EnumSet.of(
+                  ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
+                  ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
+                  ApplicationEventType.APPLICATION_INITED,
+                  ApplicationEventType.FINISH_APPLICATION))
 
            // Transitions from APPLICATION_RESOURCES_CLEANINGUP state
            .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
@@ -186,12 +193,25 @@ public class ApplicationImpl implements 
                ApplicationState.FINISHED,
                ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
                new AppCompletelyDoneTransition())
+          .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+              ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
+              EnumSet.of(
+                  ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
+                  ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
+                  ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+                  ApplicationEventType.APPLICATION_INITED,
+                  ApplicationEventType.FINISH_APPLICATION))
            
            // Transitions from FINISHED state
            .addTransition(ApplicationState.FINISHED,
                ApplicationState.FINISHED,
                ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
                new AppLogsAggregatedTransition())
+           .addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
+               EnumSet.of(
+                  ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
+                  ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
+                  ApplicationEventType.FINISH_APPLICATION))
                
            // create the topology tables
            .installTopology();
@@ -343,7 +363,7 @@ public class ApplicationImpl implements 
     @Override
     public ApplicationState transition(ApplicationImpl app,
         ApplicationEvent event) {
-
+      ApplicationFinishEvent appEvent = (ApplicationFinishEvent)event;
       if (app.containers.isEmpty()) {
         // No container to cleanup. Cleanup app level resources.
         app.handleAppFinishWithContainersCleanedup();
@@ -355,7 +375,7 @@ public class ApplicationImpl implements 
       for (ContainerId containerID : app.containers.keySet()) {
         app.dispatcher.getEventHandler().handle(
             new ContainerKillEvent(containerID,
-                "Container killed on application-finish event from RM."));
+                "Container killed on application-finish event: " + appEvent.getDiagnostic()));
       }
       return ApplicationState.FINISHING_CONTAINERS_WAIT;
     }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Wed Oct 16 21:07:28 2013
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.util.ProcessIdFileReader;
@@ -133,10 +134,22 @@ public class ContainerLaunch implements 
     final List<String> command = launchContext.getCommands();
     int ret = -1;
 
+    // CONTAINER_KILLED_ON_REQUEST should not be missed if the container
+    // is already at KILLING
+    if (container.getContainerState() == ContainerState.KILLING) {
+      dispatcher.getEventHandler().handle(
+          new ContainerExitEvent(containerID,
+              ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
+              Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
+                  ExitCode.TERMINATED.getExitCode(),
+              "Container terminated before launch."));
+      return 0;
+    }
+
     try {
       localResources = container.getLocalizedResources();
       if (localResources == null) {
-        RPCUtil.getRemoteException(
+        throw RPCUtil.getRemoteException(
             "Unable to get local resources when Container " + containerID +
             " is at " + container.getContainerState());
       }

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Wed Oct 16 21:07:28 2013
@@ -75,20 +75,9 @@ public class ContainersLauncher extends 
         new ThreadFactoryBuilder()
           .setNameFormat("ContainersLauncher #%d")
           .build());
-  private final Map<ContainerId,RunningContainer> running =
-    Collections.synchronizedMap(new HashMap<ContainerId,RunningContainer>());
-
-  private static final class RunningContainer {
-    public RunningContainer(Future<Integer> submit,
-        ContainerLaunch launcher) {
-      this.runningcontainer = submit;
-      this.launcher = launcher;
-    }
-
-    Future<Integer> runningcontainer;
-    ContainerLaunch launcher;
-  }
-
+  @VisibleForTesting
+  public final Map<ContainerId, ContainerLaunch> running =
+    Collections.synchronizedMap(new HashMap<ContainerId, ContainerLaunch>());
 
   public ContainersLauncher(Context context, Dispatcher dispatcher,
       ContainerExecutor exec, LocalDirsHandlerService dirsHandler,
@@ -133,38 +122,20 @@ public class ContainersLauncher extends 
         ContainerLaunch launch =
             new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
               event.getContainer(), dirsHandler, containerManager);
-        running.put(containerId,
-            new RunningContainer(containerLauncher.submit(launch), 
-                launch));
+        containerLauncher.submit(launch);
+        running.put(containerId, launch);
         break;
       case CLEANUP_CONTAINER:
-        RunningContainer rContainerDatum = running.remove(containerId);
-        if (rContainerDatum == null) {
+        ContainerLaunch launcher = running.remove(containerId);
+        if (launcher == null) {
           // Container not launched. So nothing needs to be done.
           return;
         }
-        Future<Integer> rContainer = rContainerDatum.runningcontainer;
-        if (rContainer != null 
-            && !rContainer.isDone()) {
-          // Cancel the future so that it won't be launched if it isn't already.
-          // If it is going to be canceled, make sure CONTAINER_KILLED_ON_REQUEST
-          // will not be missed if the container is already at KILLING
-          if (rContainer.cancel(false)) {
-            if (container.getContainerState() == ContainerState.KILLING) {
-              dispatcher.getEventHandler().handle(
-                  new ContainerExitEvent(containerId,
-                      ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
-                      Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() : 
-                        ExitCode.TERMINATED.getExitCode(),
-                      "Container terminated before launch."));
-            }
-          }
-        }
 
         // Cleanup a container whether it is running/killed/completed, so that
         // no sub-processes are alive.
         try {
-          rContainerDatum.launcher.cleanupContainer();
+          launcher.cleanupContainer();
         } catch (IOException e) {
           LOG.warn("Got exception while cleaning container " + containerId
               + ". Ignoring.");

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Wed Oct 16 21:07:28 2013
@@ -65,7 +65,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
-import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.FSDownload;
 
@@ -130,9 +129,12 @@ public class ContainerLocalizer {
     try {
       // assume credentials in cwd
       // TODO: Fix
-      credFile = lfs.open(
-          new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId)));
+      Path tokenPath =
+          new Path(String.format(TOKEN_FILE_NAME_FMT, localizerId));
+      credFile = lfs.open(tokenPath);
       creds.readTokenStorageStream(credFile);
+      // Explicitly deleting token file.
+      lfs.delete(tokenPath, false);      
     } finally  {
       if (credFile != null) {
         credFile.close();

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Oct 16 21:07:28 2013
@@ -1017,6 +1017,7 @@ public class ResourceLocalizationService
           }
         }
         if (UserGroupInformation.isSecurityEnabled()) {
+          credentials = new Credentials(credentials);
           LocalizerTokenIdentifier id = secretManager.createIdentifier();
           Token<LocalizerTokenIdentifier> localizerToken =
               new Token<LocalizerTokenIdentifier>(id, secretManager);

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/util/CgroupsLCEResourcesHandler.java Wed Oct 16 21:07:28 2013
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,6 +41,8 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
 
 public class CgroupsLCEResourcesHandler implements LCEResourcesHandler {
 
@@ -59,8 +62,13 @@ public class CgroupsLCEResourcesHandler 
   private final int CPU_DEFAULT_WEIGHT = 1024; // set by kernel
   private final Map<String, String> controllerPaths; // Controller -> path
 
+  private long deleteCgroupTimeout;
+  // package private for testing purposes
+  Clock clock;
+  
   public CgroupsLCEResourcesHandler() {
     this.controllerPaths = new HashMap<String, String>();
+    clock = new SystemClock();
   }
 
   @Override
@@ -73,7 +81,8 @@ public class CgroupsLCEResourcesHandler 
     return conf;
   }
 
-  public synchronized void init(LinuxContainerExecutor lce) throws IOException {
+  @VisibleForTesting
+  void initConfig() throws IOException {
 
     this.cgroupPrefix = conf.get(YarnConfiguration.
             NM_LINUX_CONTAINER_CGROUPS_HIERARCHY, "/hadoop-yarn");
@@ -82,6 +91,9 @@ public class CgroupsLCEResourcesHandler 
     this.cgroupMountPath = conf.get(YarnConfiguration.
             NM_LINUX_CONTAINER_CGROUPS_MOUNT_PATH, null);
 
+    this.deleteCgroupTimeout = conf.getLong(
+        YarnConfiguration.NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT,
+        YarnConfiguration.DEFAULT_NM_LINUX_CONTAINER_CGROUPS_DELETE_TIMEOUT);
     // remove extra /'s at end or start of cgroupPrefix
     if (cgroupPrefix.charAt(0) == '/') {
       cgroupPrefix = cgroupPrefix.substring(1);
@@ -91,7 +103,11 @@ public class CgroupsLCEResourcesHandler 
     if (cgroupPrefix.charAt(len - 1) == '/') {
       cgroupPrefix = cgroupPrefix.substring(0, len - 1);
     }
+  }
   
+  public void init(LinuxContainerExecutor lce) throws IOException {
+    initConfig();
+    
     // mount cgroups if requested
     if (cgroupMount && cgroupMountPath != null) {
       ArrayList<String> cgroupKVs = new ArrayList<String>();
@@ -158,14 +174,32 @@ public class CgroupsLCEResourcesHandler 
     }
   }
 
-  private void deleteCgroup(String controller, String groupName) {
-    String path = pathForCgroup(controller, groupName);
+  @VisibleForTesting
+  boolean deleteCgroup(String cgroupPath) {
+    boolean deleted;
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("deleteCgroup: " + cgroupPath);
+    }
 
-    LOG.debug("deleteCgroup: " + path);
+    long start = clock.getTime();
+    do {
+      deleted = new File(cgroupPath).delete();
+      if (!deleted) {
+        try {
+          Thread.sleep(20);
+        } catch (InterruptedException ex) {
+          // NOP        
+        }
+      }
+    } while (!deleted && (clock.getTime() - start) < deleteCgroupTimeout);
 
-    if (! new File(path).delete()) {
-      LOG.warn("Unable to delete cgroup at: " + path);
+    if (!deleted) {
+      LOG.warn("Unable to delete cgroup at: " + cgroupPath +
+          ", tried to delete for " + deleteCgroupTimeout + "ms");
     }
+
+    return deleted;
   }
 
   /*
@@ -185,21 +219,8 @@ public class CgroupsLCEResourcesHandler 
   }
 
   private void clearLimits(ContainerId containerId) {
-    String containerName = containerId.toString();
-
-    // Based on testing, ApplicationMaster executables don't terminate until
-    // a little after the container appears to have finished. Therefore, we
-    // wait a short bit for the cgroup to become empty before deleting it.
-    if (containerId.getId() == 1) {
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        // not a problem, continue anyway
-      }
-    }
-
     if (isCpuWeightEnabled()) {
-      deleteCgroup(CONTROLLER_CPU, containerName);
+      deleteCgroup(pathForCgroup(CONTROLLER_CPU, containerId.toString()));
     }
   }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c?rev=1532910&r1=1532909&r2=1532910&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c Wed Oct 16 21:07:28 2013
@@ -407,7 +407,7 @@ static int create_container_directories(
     const char *container_id, char* const* local_dir, char* const* log_dir, const char *work_dir) {
   // create dirs as 0750
   const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
-  if (app_id == NULL || container_id == NULL || user == NULL) {
+  if (app_id == NULL || container_id == NULL || user == NULL || user_detail == NULL || user_detail->pw_name == NULL) {
     fprintf(LOGFILE, 
             "Either app_id, container_id or the user passed is null.\n");
     return -1;
@@ -751,28 +751,11 @@ int initialize_user(const char *user, ch
   return failed ? INITIALIZE_USER_FAILED : 0;
 }
 
-/**
- * Function to prepare the application directories for the container.
- */
-int initialize_app(const char *user, const char *app_id,
-                   const char* nmPrivate_credentials_file,
-                   char* const* local_dirs, char* const* log_roots,
-                   char* const* args) {
-  if (app_id == NULL || user == NULL) {
-    fprintf(LOGFILE, "Either app_id is null or the user passed is null.\n");
-    return INVALID_ARGUMENT_NUMBER;
-  }
-
-  // create the user directory on all disks
-  int result = initialize_user(user, local_dirs);
-  if (result != 0) {
-    return result;
-  }
+int create_log_dirs(const char *app_id, char * const * log_dirs) {
 
-  ////////////// create the log directories for the app on all disks
   char* const* log_root;
   char *any_one_app_log_dir = NULL;
-  for(log_root=log_roots; *log_root != NULL; ++log_root) {
+  for(log_root=log_dirs; *log_root != NULL; ++log_root) {
     char *app_log_dir = get_app_log_directory(*log_root, app_id);
     if (app_log_dir == NULL) {
       // try the next one
@@ -791,7 +774,33 @@ int initialize_app(const char *user, con
     return -1;
   }
   free(any_one_app_log_dir);
-  ////////////// End of creating the log directories for the app on all disks
+  return 0;
+}
+
+
+/**
+ * Function to prepare the application directories for the container.
+ */
+int initialize_app(const char *user, const char *app_id,
+                   const char* nmPrivate_credentials_file,
+                   char* const* local_dirs, char* const* log_roots,
+                   char* const* args) {
+  if (app_id == NULL || user == NULL || user_detail == NULL || user_detail->pw_name == NULL) {
+    fprintf(LOGFILE, "Either app_id is null or the user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+
+  // create the user directory on all disks
+  int result = initialize_user(user, local_dirs);
+  if (result != 0) {
+    return result;
+  }
+
+  // create the log directories for the app on all disks
+  int log_create_result = create_log_dirs(app_id, log_roots);
+  if (log_create_result != 0) {
+    return log_create_result;
+  }
 
   // open up the credentials file
   int cred_file = open_file_as_nm(nmPrivate_credentials_file);
@@ -922,18 +931,34 @@ int launch_container_as_user(const char 
     }
   }
 
+  // create the user directory on all disks
+  int result = initialize_user(user, local_dirs);
+  if (result != 0) {
+    return result;
+  }
+
+  // initializing log dirs
+  int log_create_result = create_log_dirs(app_id, log_dirs);
+  if (log_create_result != 0) {
+    return log_create_result;
+  }
+
   // give up root privs
   if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
     exit_code = SETUID_OPER_FAILED;
     goto cleanup;
   }
 
+  // Create container specific directories as user. If there are no resources
+  // to localize for this container, app-directories and log-directories are
+  // also created automatically as part of this call.
   if (create_container_directories(user, app_id, container_id, local_dirs,
                                    log_dirs, work_dir) != 0) {
     fprintf(LOGFILE, "Could not create container dirs");
     goto cleanup;
   }
 
+
   // 700
   if (copy_file(container_file_source, script_name, script_file_dest,S_IRWXU) != 0) {
     goto cleanup;



Mime
View raw message