hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gkesa...@apache.org
Subject svn commit: r1369164 [4/7] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduc...
Date Fri, 03 Aug 2012 19:00:51 GMT
Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java Fri Aug  3 19:00:15 2012
@@ -2,10 +2,10 @@ package org.apache.hadoop.yarn.webapp.lo
 
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
 
-import java.io.DataInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Map;
@@ -19,10 +19,11 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 
 import com.google.inject.Inject;
@@ -41,8 +42,9 @@ public class AggregatedLogsBlock extends
     ContainerId containerId = verifyAndGetContainerId(html);
     NodeId nodeId = verifyAndGetNodeId(html);
     String appOwner = verifyAndGetAppOwner(html);
+    LogLimits logLimits = verifyAndGetLogLimits(html);
     if (containerId == null || nodeId == null || appOwner == null
-        || appOwner.isEmpty()) {
+        || appOwner.isEmpty() || logLimits == null) {
       return;
     }
     
@@ -113,24 +115,29 @@ public class AggregatedLogsBlock extends
       return;
     }
 
-    DataInputStream valueStream;
-    LogKey key = new LogKey();
+    String desiredLogType = $(CONTAINER_LOG_TYPE);
     try {
-      valueStream = reader.next(key);
-      while (valueStream != null
-          && !key.toString().equals(containerId.toString())) {
-        valueStream = reader.next(key);
-      }
-      if (valueStream == null) {
+      AggregatedLogFormat.ContainerLogsReader logReader =
+          reader.getContainerLogsReader(containerId);
+      if (logReader == null) {
         html.h1()._(
             "Logs not available for " + logEntity
                 + ". Could be caused by the rentention policy")._();
         return;
       }
-      writer().write("<pre>");
-      AggregatedLogFormat.LogReader.readAcontainerLogs(valueStream, writer());
-      writer().write("</pre>");
-      return;
+
+      boolean foundLog = readContainerLogs(html, logReader, logLimits,
+          desiredLogType);
+
+      if (!foundLog) {
+        if (desiredLogType.isEmpty()) {
+          html.h1("No logs available for container " + containerId.toString());
+        } else {
+          html.h1("Unable to locate '" + desiredLogType
+              + "' log for container " + containerId.toString());
+        }
+        return;
+      }
     } catch (IOException e) {
       html.h1()._("Error getting logs for " + logEntity)._();
       LOG.error("Error getting logs for " + logEntity, e);
@@ -138,6 +145,76 @@ public class AggregatedLogsBlock extends
     }
   }
 
+  private boolean readContainerLogs(Block html,
+      AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
+      String desiredLogType) throws IOException {
+    int bufferSize = 65536;
+    char[] cbuf = new char[bufferSize];
+
+    boolean foundLog = false;
+    String logType = logReader.nextLog();
+    while (logType != null) {
+      if (desiredLogType == null || desiredLogType.isEmpty()
+          || desiredLogType.equals(logType)) {
+        long logLength = logReader.getCurrentLogLength();
+
+        if (foundLog) {
+          html.pre()._("\n\n")._();
+        }
+
+        html.p()._("Log Type: " + logType)._();
+        html.p()._("Log Length: " + Long.toString(logLength))._();
+
+        long start = logLimits.start < 0
+            ? logLength + logLimits.start : logLimits.start;
+        start = start < 0 ? 0 : start;
+        start = start > logLength ? logLength : start;
+        long end = logLimits.end < 0
+            ? logLength + logLimits.end : logLimits.end;
+        end = end < 0 ? 0 : end;
+        end = end > logLength ? logLength : end;
+        end = end < start ? start : end;
+
+        long toRead = end - start;
+        if (toRead < logLength) {
+            html.p()._("Showing " + toRead + " bytes of " + logLength
+                + " total. Click ")
+                .a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
+                    $(ENTITY_STRING), $(APP_OWNER),
+                    logType, "?start=0"), "here").
+                    _(" for the full log.")._();
+        }
+
+        long totalSkipped = 0;
+        while (totalSkipped < start) {
+          long ret = logReader.skip(start - totalSkipped);
+          if (ret < 0) {
+            throw new IOException( "Premature EOF from container log");
+          }
+          totalSkipped += ret;
+        }
+
+        int len = 0;
+        int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+        PRE<Hamlet> pre = html.pre();
+
+        while (toRead > 0
+            && (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
+          pre._(new String(cbuf, 0, len));
+          toRead = toRead - len;
+          currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+        }
+
+        pre._();
+        foundLog = true;
+      }
+
+      logType = logReader.nextLog();
+    }
+
+    return foundLog;
+  }
+
   private ContainerId verifyAndGetContainerId(Block html) {
     String containerIdStr = $(CONTAINER_ID);
     if (containerIdStr == null || containerIdStr.isEmpty()) {
@@ -180,4 +257,44 @@ public class AggregatedLogsBlock extends
     }
     return appOwner;
   }
+
+  private static class LogLimits {
+    long start;
+    long end;
+  }
+
+  private LogLimits verifyAndGetLogLimits(Block html) {
+    long start = -4096;
+    long end = Long.MAX_VALUE;
+    boolean isValid = true;
+
+    String startStr = $("start");
+    if (startStr != null && !startStr.isEmpty()) {
+      try {
+        start = Long.parseLong(startStr);
+      } catch (NumberFormatException e) {
+        isValid = false;
+        html.h1()._("Invalid log start value: " + startStr)._();
+      }
+    }
+
+    String endStr = $("end");
+    if (endStr != null && !endStr.isEmpty()) {
+      try {
+        end = Long.parseLong(endStr);
+      } catch (NumberFormatException e) {
+        isValid = false;
+        html.h1()._("Invalid log end value: " + endStr)._();
+      }
+    }
+
+    if (!isValid) {
+      return null;
+    }
+
+    LogLimits limits = new LogLimits();
+    limits.start = start;
+    limits.end = end;
+    return limits;
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Fri Aug  3 19:00:15 2012
@@ -241,7 +241,7 @@
 
   <!-- Node Manager Configs -->
   <property>
-    <description>address of node manager IPC.</description>
+    <description>The address of the container manager in the NM.</description>
     <name>yarn.nodemanager.address</name>
     <value>0.0.0.0:0</value>
   </property>
@@ -538,16 +538,9 @@
   </property>
 
   <property>
-    <description>Classpath for typical applications.</description>
+    <description>CLASSPATH for YARN applications. A comma-separated list
+    of CLASSPATH entries</description>
      <name>yarn.application.classpath</name>
-     <value>
-        $HADOOP_CONF_DIR,
-        $HADOOP_COMMON_HOME/share/hadoop/common/*,
-        $HADOOP_COMMON_HOME/share/hadoop/common/lib/*,
-        $HADOOP_HDFS_HOME/share/hadoop/hdfs/*,
-        $HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,
-        $YARN_HOME/share/hadoop/mapreduce/*,
-        $YARN_HOME/share/hadoop/mapreduce/lib/*
-     </value>
+     <value>$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/share/hadoop/common/*,$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,$YARN_HOME/share/hadoop/mapreduce/*,$YARN_HOME/share/hadoop/mapreduce/lib/*</value>
   </property>
 </configuration>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/MockApps.java Fri Aug  3 19:00:15 2012
@@ -19,18 +19,13 @@
 package org.apache.hadoop.yarn;
 
 import java.util.Iterator;
-import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.util.Records;
 
 import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
 
 /**
  * Utilities to generate fake test apps
@@ -66,137 +61,6 @@ public class MockApps {
     }
   }
 
-  public static List<ApplicationReport> genApps(int n) {
-    List<ApplicationReport> list = Lists.newArrayList();
-    for (int i = 0; i < n; ++i) {
-      list.add(newApp(i));
-    }
-    return list;
-  }
-
-  public static ApplicationReport newApp(int i) {
-    final ApplicationId id = newAppID(i);
-    final YarnApplicationState state = newAppState();
-    final String user = newUserName();
-    final String name = newAppName();
-    final String queue = newQueue();
-    final FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
-    return new ApplicationReport() {
-      private ApplicationResourceUsageReport appUsageReport;
-      @Override public ApplicationId getApplicationId() { return id; }
-      @Override public String getUser() { return user; }
-      @Override public String getName() { return name; }
-      @Override public YarnApplicationState getYarnApplicationState() { return state; }
-      @Override public String getQueue() { return queue; }
-      @Override public String getTrackingUrl() { return ""; }
-      @Override public String getOriginalTrackingUrl() { return ""; }
-      @Override public FinalApplicationStatus getFinalApplicationStatus() { return finishState; }
-      @Override
-      public ApplicationResourceUsageReport getApplicationResourceUsageReport() {
-        return this.appUsageReport;
-      }
-      public void setApplicationId(ApplicationId applicationId) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public void setTrackingUrl(String url) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override public void setOriginalTrackingUrl(String url) { }
-      @Override
-      public void setApplicationResourceUsageReport(ApplicationResourceUsageReport appResources) {
-        this.appUsageReport = appResources;
-      }
-      @Override
-      public void setName(String name) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public void setQueue(String queue) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public void setYarnApplicationState(YarnApplicationState state) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public void setUser(String user) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public String getDiagnostics() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-      @Override
-      public void setDiagnostics(String diagnostics) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public String getHost() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-      @Override
-      public void setHost(String host) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public int getRpcPort() {
-        // TODO Auto-generated method stub
-        return 0;
-      }
-      @Override
-      public void setRpcPort(int rpcPort) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public String getClientToken() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-      @Override
-      public void setClientToken(String clientToken) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public long getStartTime() {
-        // TODO Auto-generated method stub
-        return 0;
-      }
-
-      @Override
-      public void setStartTime(long startTime) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public long getFinishTime() {
-        // TODO Auto-generated method stub
-        return 0;
-      }
-      @Override
-      public void setFinishTime(long finishTime) {
-        // TODO Auto-generated method stub
-
-      }
-      @Override
-      public void setFinalApplicationStatus(FinalApplicationStatus finishState) {
-		// TODO Auto-generated method stub
-      }
-    };
-  }
-
   public static ApplicationId newAppID(int i) {
     ApplicationId id = Records.newRecord(ApplicationId.class);
     id.setClusterTimestamp(TS);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ContainerTokenSecretManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/ContainerTokenSecretManager.java Fri Aug  3 19:00:15 2012
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.security;
 
+import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -25,9 +26,21 @@ import javax.crypto.SecretKey;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 
+/**
+ * SecretManager for ContainerTokens. Used by both RM and NM and hence is
+ * present in yarn-server-common package.
+ * 
+ */
 public class ContainerTokenSecretManager extends
     SecretManager<ContainerTokenIdentifier> {
 
@@ -36,7 +49,34 @@ public class ContainerTokenSecretManager
 
   Map<String, SecretKey> secretkeys =
     new ConcurrentHashMap<String, SecretKey>();
-  
+
+  private final long containerTokenExpiryInterval;
+
+  public ContainerTokenSecretManager(Configuration conf) {
+    this.containerTokenExpiryInterval =
+        conf.getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
+  }
+
+  public ContainerToken createContainerToken(ContainerId containerId,
+      NodeId nodeId, Resource capability) {
+    try {
+      long expiryTimeStamp =
+          System.currentTimeMillis() + containerTokenExpiryInterval;
+      ContainerTokenIdentifier tokenIdentifier =
+          new ContainerTokenIdentifier(containerId, nodeId.toString(),
+            capability, expiryTimeStamp);
+      return BuilderUtils.newContainerToken(nodeId,
+        ByteBuffer.wrap(this.createPassword(tokenIdentifier)), tokenIdentifier);
+    } catch (IllegalArgumentException e) {
+      // this could be because DNS is down - in which case we just want
+      // to retry and not bring RM down. Caller should note and act on the fact
+      // that container is not creatable.
+      LOG.error("Error trying to create new container", e);
+      return null;
+    }
+  }
+
   // Used by master for generation of secretyKey per host
   public SecretKey createAndGetSecretKey(CharSequence hostName) {
     String hostNameStr = hostName.toString();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml Fri Aug  3 19:00:15 2012
@@ -49,6 +49,9 @@
           <plugin>
             <groupId>org.apache.maven.plugins</groupId>
             <artifactId>maven-antrun-plugin</artifactId>
+            <configuration>
+              <skipTests>false</skipTests>
+            </configuration>
             <executions>
               <execution>
                 <id>make</id>
@@ -72,7 +75,10 @@
                 <phase>test</phase>
                 <configuration>
                   <target>
-                    <exec executable="test-container-executor" dir="${project.build.directory}/native" failonerror="true">
+                    <exec executable="sh" failonerror="true" dir="${project.build.directory}/native">
+                      <arg value="-c"/>
+                      <arg value="[ x$SKIPTESTS = xtrue ] || test-container-executor"/>
+                      <env key="SKIPTESTS" value="${skipTests}"/>
                     </exec>
                   </target>
                 </configuration>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java Fri Aug  3 19:00:15 2012
@@ -93,23 +93,7 @@ public class LocalDirsHandlerService ext
 
     @Override
     public void run() {
-      boolean newFailure = false;
-      if (localDirs.checkDirs()) {
-        newFailure = true;
-      }
-      if (logDirs.checkDirs()) {
-        newFailure = true;
-      }
-
-      if (newFailure) {
-        LOG.info("Disk(s) failed. " + getDisksHealthReport());
-        updateDirsInConfiguration();
-        if (!areDisksHealthy()) {
-          // Just log.
-          LOG.error("Most of the disks failed. " + getDisksHealthReport());
-        }
-      }
-      lastDisksCheckTime = System.currentTimeMillis();
+      checkDirs();
     }
   }
 
@@ -135,6 +119,10 @@ public class LocalDirsHandlerService ext
         YarnConfiguration.DEFAULT_NM_MIN_HEALTHY_DISKS_FRACTION);
     lastDisksCheckTime = System.currentTimeMillis();
     super.init(conf);
+
+    // Check the disk health immediately to weed out bad directories
+    // before other init code attempts to use them.
+    checkDirs();
   }
 
   /**
@@ -144,10 +132,8 @@ public class LocalDirsHandlerService ext
   public void start() {
     if (isDiskHealthCheckerEnabled) {
       dirsHandlerScheduler = new Timer("DiskHealthMonitor-Timer", true);
-      // Start the timer task for disk health checking immediately and
-      // then run periodically at interval time.
-      dirsHandlerScheduler.scheduleAtFixedRate(monitoringTimerTask, 0,
-                                                   diskHealthCheckInterval);
+      dirsHandlerScheduler.scheduleAtFixedRate(monitoringTimerTask,
+          diskHealthCheckInterval, diskHealthCheckInterval);
     }
     super.start();
   }
@@ -253,6 +239,26 @@ public class LocalDirsHandlerService ext
                       logDirs.toArray(new String[logDirs.size()]));
   }
 
+  private void checkDirs() {
+      boolean newFailure = false;
+      if (localDirs.checkDirs()) {
+        newFailure = true;
+      }
+      if (logDirs.checkDirs()) {
+        newFailure = true;
+      }
+
+      if (newFailure) {
+        LOG.info("Disk(s) failed. " + getDisksHealthReport());
+        updateDirsInConfiguration();
+        if (!areDisksHealthy()) {
+          // Just log.
+          LOG.error("Most of the disks failed. " + getDisksHealthReport());
+        }
+      }
+      lastDisksCheckTime = System.currentTimeMillis();
+  }
+
   public Path getLocalPathForWrite(String pathStr) throws IOException {
     return localDirsAllocator.getLocalPathForWrite(pathStr, getConfig());
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Fri Aug  3 19:00:15 2012
@@ -33,6 +33,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
@@ -115,7 +116,7 @@ public class NodeManager extends Composi
     if (UserGroupInformation.isSecurityEnabled()) {
       LOG.info("Security is enabled on NodeManager. "
           + "Creating ContainerTokenSecretManager");
-      this.containerTokenSecretManager = new ContainerTokenSecretManager();
+      this.containerTokenSecretManager = new ContainerTokenSecretManager(conf);
     }
 
     this.aclsManager = new ApplicationACLsManager(conf);
@@ -279,6 +280,7 @@ public class NodeManager extends Composi
   }
 
   public static void main(String[] args) {
+    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
     NodeManager nodeManager = new NodeManager();
     nodeManager.initAndStartNodeManager(false);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Fri Aug  3 19:00:15 2012
@@ -324,6 +324,15 @@ public class ContainerManagerImpl extend
                 + containerIDStr);
       } else {
 
+        // Ensure the token is not expired. 
+        // Token expiry is not checked for stopContainer/getContainerStatus
+        if (tokenId.getExpiryTimeStamp() < System.currentTimeMillis()) {
+          unauthorized = true;
+          messageBuilder.append("\nThis token is expired. current time is "
+              + System.currentTimeMillis() + " found "
+              + tokenId.getExpiryTimeStamp());
+        }
+        
         Resource resource = tokenId.getResource();
         if (!resource.equals(launchContext.getResource())) {
           unauthorized = true;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Fri Aug  3 19:00:15 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -315,6 +316,7 @@ public class ContainerLocalizer {
   }
 
   public static void main(String[] argv) throws Throwable {
+    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     // usage: $0 user appId locId host port app_log_dir user_dir [user_dir]*
     // let $x = $x/usercache for $local.dir
     // MKDIR $x/$user/appcache/$appid

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/LocalResourcesTrackerImpl.java Fri Aug  3 19:00:15 2012
@@ -17,6 +17,7 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import java.io.File;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
 
 /**
  * A collection of {@link LocalizedResource}s all of same
@@ -67,6 +69,12 @@ class LocalResourcesTrackerImpl implemen
     switch (event.getType()) {
     case REQUEST:
     case LOCALIZED:
+      if (rsrc != null && (!isResourcePresent(rsrc))) {
+        LOG.info("Resource " + rsrc.getLocalPath()
+            + " is missing, localizing it again");
+        localrsrc.remove(req);
+        rsrc = null;
+      }
       if (null == rsrc) {
         rsrc = new LocalizedResource(req, dispatcher);
         localrsrc.put(req, rsrc);
@@ -82,6 +90,24 @@ class LocalResourcesTrackerImpl implemen
     rsrc.handle(event);
   }
 
+  /**
+   * This module checks if the resource which was localized is already present
+   * or not
+   * 
+   * @param rsrc
+   * @return true/false based on resource is present or not
+   */
+  public boolean isResourcePresent(LocalizedResource rsrc) {
+    boolean ret = true;
+    if (rsrc.getState() == ResourceState.LOCALIZED) {
+      File file = new File(rsrc.getLocalPath().toUri().getRawPath().toString());
+      if (!file.exists()) {
+        ret = false;
+      }
+    }
+    return ret;
+  }
+  
   @Override
   public boolean contains(LocalResourceRequest resource) {
     return localrsrc.containsKey(resource);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Fri Aug  3 19:00:15 2012
@@ -342,14 +342,14 @@ public class LogAggregationService exten
     // A container is complete. Put this containers' logs up for aggregation if
     // this containers' logs are needed.
 
-    if (!this.appLogAggregators.containsKey(
-        containerId.getApplicationAttemptId().getApplicationId())) {
-      throw new YarnException("Application is not initialized yet for "
-          + containerId);
+    AppLogAggregator aggregator = this.appLogAggregators.get(
+        containerId.getApplicationAttemptId().getApplicationId());
+    if (aggregator == null) {
+      LOG.warn("Log aggregation is not initialized for " + containerId
+          + ", did it fail to start?");
+      return;
     }
-    this.appLogAggregators.get(
-        containerId.getApplicationAttemptId().getApplicationId())
-        .startContainerLogAggregation(containerId, exitCode == 0);
+    aggregator.startContainerLogAggregation(containerId, exitCode == 0);
   }
 
   private void stopApp(ApplicationId appId) {
@@ -357,11 +357,13 @@ public class LogAggregationService exten
     // App is complete. Finish up any containers' pending log aggregation and
     // close the application specific logFile.
 
-    if (!this.appLogAggregators.containsKey(appId)) {
-      throw new YarnException("Application is not initialized yet for "
-          + appId);
+    AppLogAggregator aggregator = this.appLogAggregators.get(appId);
+    if (aggregator == null) {
+      LOG.warn("Log aggregation is not initialized for " + appId
+          + ", did it fail to start?");
+      return;
     }
-    this.appLogAggregators.get(appId).finishLogAggregation();
+    aggregator.finishLogAggregation();
   }
 
   @Override

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c Fri Aug  3 19:00:15 2012
@@ -97,7 +97,7 @@ int write_config_file(char *file_name) {
     return EXIT_FAILURE;
   }
   fprintf(file, "banned.users=bannedUser\n");
-  fprintf(file, "min.user.id=1000\n");
+  fprintf(file, "min.user.id=500\n");
   fclose(file);
   return 0;
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Fri Aug  3 19:00:15 2012
@@ -86,7 +86,8 @@ public class TestEventFlow {
     healthChecker.init(conf);
     LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
     NodeManagerMetrics metrics = NodeManagerMetrics.create();
-    ContainerTokenSecretManager containerTokenSecretManager =  new ContainerTokenSecretManager();
+    ContainerTokenSecretManager containerTokenSecretManager =
+        new ContainerTokenSecretManager(conf);
     NodeStatusUpdater nodeStatusUpdater =
         new NodeStatusUpdaterImpl(context, dispatcher, healthChecker, metrics, containerTokenSecretManager) {
       @Override

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java Fri Aug  3 19:00:15 2012
@@ -70,7 +70,8 @@ public abstract class BaseContainerManag
   protected static File localLogDir;
   protected static File remoteLogDir;
   protected static File tmpDir;
-  protected ContainerTokenSecretManager containerTokenSecretManager = new ContainerTokenSecretManager();
+  protected ContainerTokenSecretManager containerTokenSecretManager =
+      new ContainerTokenSecretManager(new Configuration());
 
   protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Fri Aug  3 19:00:15 2012
@@ -385,7 +385,7 @@ public class TestContainerManager extend
     delSrvc.init(conf);
 
     ContainerTokenSecretManager containerTokenSecretManager = new 
-        ContainerTokenSecretManager();
+        ContainerTokenSecretManager(conf);
     containerManager = new ContainerManagerImpl(context, exec, delSrvc,
         nodeStatusUpdater, metrics, containerTokenSecretManager,
         new ApplicationACLsManager(conf), dirsHandler);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalResourcesTrackerImpl.java Fri Aug  3 19:00:15 2012
@@ -5,6 +5,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -30,6 +32,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.Test;
+import org.mortbay.log.Log;
 
 public class TestLocalResourcesTrackerImpl {
 
@@ -131,6 +134,86 @@ public class TestLocalResourcesTrackerIm
     }
   }
 
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testConsistency() {
+    String user = "testuser";
+    DrainDispatcher dispatcher = null;
+    try {
+      dispatcher = createDispatcher(new Configuration());
+      EventHandler<LocalizerEvent> localizerEventHandler = mock(EventHandler.class);
+      EventHandler<LocalizerEvent> containerEventHandler = mock(EventHandler.class);
+      dispatcher.register(LocalizerEventType.class, localizerEventHandler);
+      dispatcher.register(ContainerEventType.class, containerEventHandler);
+
+      ContainerId cId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+      LocalizerContext lc1 = new LocalizerContext(user, cId1, null);
+      LocalResourceRequest req1 = createLocalResourceRequest(user, 1, 1,
+          LocalResourceVisibility.PUBLIC);
+      LocalizedResource lr1 = createLocalizedResource(req1, dispatcher);
+      ConcurrentMap<LocalResourceRequest, LocalizedResource> localrsrc = new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>();
+      localrsrc.put(req1, lr1);
+      LocalResourcesTracker tracker = new LocalResourcesTrackerImpl(user,
+          dispatcher, localrsrc);
+
+      ResourceEvent req11Event = new ResourceRequestEvent(req1,
+          LocalResourceVisibility.PUBLIC, lc1);
+
+      ResourceEvent rel11Event = new ResourceReleaseEvent(req1, cId1);
+
+      // Localize R1 for C1
+      tracker.handle(req11Event);
+
+      dispatcher.await();
+
+      // Verify refCount for R1 is 1
+      Assert.assertEquals(1, lr1.getRefCount());
+
+      dispatcher.await();
+      verifyTrackedResourceCount(tracker, 1);
+
+      // Localize resource1
+      ResourceLocalizedEvent rle = new ResourceLocalizedEvent(req1, new Path(
+          "file:///tmp/r1"), 1);
+      lr1.handle(rle);
+      Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+      Assert.assertTrue(createdummylocalizefile(new Path("file:///tmp/r1")));
+      LocalizedResource rsrcbefore = tracker.iterator().next();
+      File resFile = new File(lr1.getLocalPath().toUri().getRawPath()
+          .toString());
+      Assert.assertTrue(resFile.exists());
+      Assert.assertTrue(resFile.delete());
+
+      // Localize R1 for C1
+      tracker.handle(req11Event);
+
+      dispatcher.await();
+      lr1.handle(rle);
+      Assert.assertTrue(lr1.getState().equals(ResourceState.LOCALIZED));
+      LocalizedResource rsrcafter = tracker.iterator().next();
+      if (rsrcbefore == rsrcafter) {
+        Assert.fail("Localized resource should not be equal");
+      }
+      // Release resource1
+      tracker.handle(rel11Event);
+    } finally {
+      if (dispatcher != null) {
+        dispatcher.stop();
+      }
+    }
+  }
+
+  private boolean createdummylocalizefile(Path path) {
+    boolean ret = false;
+    File file = new File(path.toUri().getRawPath().toString());
+    try {
+      ret = file.createNewFile();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return ret;
+  }
+  
   private void verifyTrackedResourceCount(LocalResourcesTracker tracker,
       int expected) {
     int count = 0;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Fri Aug  3 19:00:15 2012
@@ -380,7 +380,7 @@ public class TestLogAggregationService e
   
   @Test
   @SuppressWarnings("unchecked")
-  public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
+  public void testLogAggregationFailsWithoutKillingNM() throws Exception {
     
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
@@ -412,7 +412,16 @@ public class TestLogAggregationService e
         new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!")
     };
     checkEvents(appEventHandler, expectedEvents, false,
-        "getType", "getApplicationID", "getDiagnostic");    
+        "getType", "getApplicationID", "getDiagnostic");
+
+    // verify trying to collect logs for containers/apps we don't know about
+    // doesn't blow up and tear down the NM
+    logAggregationService.handle(new LogHandlerContainerFinishedEvent(
+        BuilderUtils.newContainerId(4, 1, 1, 1), 0));
+    dispatcher.await();
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(
+        BuilderUtils.newApplicationId(1, 5)));
+    dispatcher.await();
   }
   
   private void writeContainerLogs(File appLogDir, ContainerId containerId)

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Fri Aug  3 19:00:15 2012
@@ -389,7 +389,9 @@ public class ClientRMService extends Abs
         appReports = new ArrayList<ApplicationReport>(
             apps.size());
         for (RMApp app : apps) {
-          appReports.add(app.createAndGetApplicationReport(true));
+          if (app.getQueue().equals(queueInfo.getQueueName())) {
+            appReports.add(app.createAndGetApplicationReport(true));
+          }
         }
       }
       queueInfo.setApplications(appReports);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java Fri Aug  3 19:00:15 2012
@@ -51,6 +51,8 @@ public interface RMContext {
 
   AMLivelinessMonitor getAMLivelinessMonitor();
 
+  AMLivelinessMonitor getAMFinishingMonitor();
+
   ContainerAllocationExpirer getContainerAllocationExpirer();
   
   DelegationTokenRenewer getDelegationTokenRenewer();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java Fri Aug  3 19:00:15 2012
@@ -49,6 +49,7 @@ public class RMContextImpl implements RM
     = new ConcurrentHashMap<String, RMNode>();
 
   private AMLivelinessMonitor amLivelinessMonitor;
+  private AMLivelinessMonitor amFinishingMonitor;
   private ContainerAllocationExpirer containerAllocationExpirer;
   private final DelegationTokenRenewer tokenRenewer;
   private final ApplicationTokenSecretManager appTokenSecretManager;
@@ -56,12 +57,14 @@ public class RMContextImpl implements RM
   public RMContextImpl(Store store, Dispatcher rmDispatcher,
       ContainerAllocationExpirer containerAllocationExpirer,
       AMLivelinessMonitor amLivelinessMonitor,
+      AMLivelinessMonitor amFinishingMonitor,
       DelegationTokenRenewer tokenRenewer,
       ApplicationTokenSecretManager appTokenSecretManager) {
     this.store = store;
     this.rmDispatcher = rmDispatcher;
     this.containerAllocationExpirer = containerAllocationExpirer;
     this.amLivelinessMonitor = amLivelinessMonitor;
+    this.amFinishingMonitor = amFinishingMonitor;
     this.tokenRenewer = tokenRenewer;
     this.appTokenSecretManager = appTokenSecretManager;
   }
@@ -107,6 +110,11 @@ public class RMContextImpl implements RM
   }
 
   @Override
+  public AMLivelinessMonitor getAMFinishingMonitor() {
+    return this.amFinishingMonitor;
+  }
+
+  @Override
   public DelegationTokenRenewer getDelegationTokenRenewer() {
     return tokenRenewer;
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java Fri Aug  3 19:00:15 2012
@@ -34,6 +34,7 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -99,8 +100,7 @@ public class ResourceManager extends Com
   protected ClientToAMSecretManager clientToAMSecretManager =
       new ClientToAMSecretManager();
   
-  protected ContainerTokenSecretManager containerTokenSecretManager =
-      new ContainerTokenSecretManager();
+  protected ContainerTokenSecretManager containerTokenSecretManager;
 
   protected ApplicationTokenSecretManager appTokenSecretManager;
 
@@ -150,16 +150,21 @@ public class ResourceManager extends Com
         this.rmDispatcher);
     addService(this.containerAllocationExpirer);
 
+    this.containerTokenSecretManager  = new ContainerTokenSecretManager(conf);
+
     AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
     addService(amLivelinessMonitor);
 
+    AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
+    addService(amFinishingMonitor);
+
     DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
     addService(tokenRenewer);
     
-    this.rmContext =
-        new RMContextImpl(this.store, this.rmDispatcher,
-          this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer,
-          this.appTokenSecretManager);
+    this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
+        this.containerAllocationExpirer,
+        amLivelinessMonitor, amFinishingMonitor,
+        tokenRenewer, this.appTokenSecretManager);
 
     // Register event handler for NodesListManager
     this.nodesListManager = new NodesListManager(this.rmContext);
@@ -611,6 +616,11 @@ public class ResourceManager extends Com
   }
 
   @Private
+  public ContainerTokenSecretManager getContainerTokenSecretManager() {
+    return this.containerTokenSecretManager;
+  }
+
+  @Private
   public ApplicationTokenSecretManager getApplicationTokenSecretManager(){
     return this.appTokenSecretManager;
   }
@@ -622,6 +632,7 @@ public class ResourceManager extends Com
   }
   
   public static void main(String argv[]) {
+    Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
     try {
       Configuration conf = new YarnConfiguration();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Fri Aug  3 19:00:15 2012
@@ -91,6 +91,16 @@ public class Resources {
   public static Resource multiply(Resource lhs, int by) {
     return multiplyTo(clone(lhs), by);
   }
+  
+  /**
+   * Mutliply a resource by a {@code double}. Note that integral 
+   * resource quantites are subject to rounding during cast.
+   */
+  public static Resource multiply(Resource lhs, double by) {
+    Resource out = clone(lhs);
+    out.setMemory((int) (lhs.getMemory() * by));
+    return out;
+  }
 
   public static boolean equals(Resource lhs, Resource rhs) {
     return lhs.getMemory() == rhs.getMemory();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Fri Aug  3 19:00:15 2012
@@ -27,6 +27,7 @@ public enum RMAppEventType {
   APP_REJECTED,
   APP_ACCEPTED,
   ATTEMPT_REGISTERED,
+  ATTEMPT_FINISHING,
   ATTEMPT_FINISHED, // Will send the final state
   ATTEMPT_FAILED,
   ATTEMPT_KILLED,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Fri Aug  3 19:00:15 2012
@@ -147,6 +147,8 @@ public class RMAppImpl implements RMApp 
      // Transitions from RUNNING state
     .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
         RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
+    .addTransition(RMAppState.RUNNING, RMAppState.FINISHING,
+        RMAppEventType.ATTEMPT_FINISHING, new RMAppFinishingTransition())
     .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
         RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
     .addTransition(RMAppState.RUNNING,
@@ -156,12 +158,24 @@ public class RMAppImpl implements RMApp 
     .addTransition(RMAppState.RUNNING, RMAppState.KILLED,
         RMAppEventType.KILL, new KillAppAndAttemptTransition())
 
+     // Transitions from FINISHING state
+    .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
+        RMAppEventType.ATTEMPT_FINISHED, FINAL_TRANSITION)
+    .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
+        RMAppEventType.KILL, new KillAppAndAttemptTransition())
+    // ignorable transitions
+    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+        RMAppEventType.NODE_UPDATE)
+
      // Transitions from FINISHED state
     .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
         RMAppEventType.KILL)
      // ignorable transitions
     .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
-        RMAppEventType.NODE_UPDATE)
+        EnumSet.of(
+            RMAppEventType.NODE_UPDATE,
+            RMAppEventType.ATTEMPT_FINISHING,
+            RMAppEventType.ATTEMPT_FINISHED))
 
      // Transitions from FAILED state
     .addTransition(RMAppState.FAILED, RMAppState.FAILED,
@@ -192,7 +206,8 @@ public class RMAppImpl implements RMApp 
       BuilderUtils.newApplicationResourceUsageReport(-1, -1,
           Resources.createResource(-1), Resources.createResource(-1),
           Resources.createResource(-1));
-
+  private static final int DUMMY_APPLICATION_ATTEMPT_NUMBER = -1;
+  
   public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
       Configuration config, String name, String user, String queue,
       ApplicationSubmissionContext submissionContext, String clientTokenStr,
@@ -333,10 +348,12 @@ public class RMAppImpl implements RMApp 
     case NEW:
       return YarnApplicationState.NEW;
     case SUBMITTED:
-    case ACCEPTED:
       return YarnApplicationState.SUBMITTED;
+    case ACCEPTED:
+      return YarnApplicationState.ACCEPTED;
     case RUNNING:
       return YarnApplicationState.RUNNING;
+    case FINISHING:
     case FINISHED:
       return YarnApplicationState.FINISHED;
     case KILLED:
@@ -355,6 +372,7 @@ public class RMAppImpl implements RMApp 
     case RUNNING:
       return FinalApplicationStatus.UNDEFINED;    
     // finished without a proper final state is the same as failed  
+    case FINISHING:
     case FINISHED:
     case FAILED:
       return FinalApplicationStatus.FAILED;
@@ -382,6 +400,7 @@ public class RMAppImpl implements RMApp 
     this.readLock.lock();
 
     try {
+      ApplicationAttemptId currentApplicationAttemptId = null;
       String clientToken = UNAVAILABLE;
       String trackingUrl = UNAVAILABLE;
       String host = UNAVAILABLE;
@@ -392,23 +411,31 @@ public class RMAppImpl implements RMApp 
       String diags = UNAVAILABLE;
       if (allowAccess) {
         if (this.currentAttempt != null) {
+          currentApplicationAttemptId = this.currentAttempt.getAppAttemptId();
           trackingUrl = this.currentAttempt.getTrackingUrl();
           origTrackingUrl = this.currentAttempt.getOriginalTrackingUrl();
           clientToken = this.currentAttempt.getClientToken();
           host = this.currentAttempt.getHost();
           rpcPort = this.currentAttempt.getRpcPort();
           appUsageReport = currentAttempt.getApplicationResourceUsageReport();
+        } else {
+          currentApplicationAttemptId = 
+              BuilderUtils.newApplicationAttemptId(this.applicationId, 
+                  DUMMY_APPLICATION_ATTEMPT_NUMBER);
         }
         diags = this.diagnostics.toString();
       } else {
         appUsageReport = DUMMY_APPLICATION_RESOURCE_USAGE_REPORT;
+        currentApplicationAttemptId = 
+            BuilderUtils.newApplicationAttemptId(this.applicationId, 
+                DUMMY_APPLICATION_ATTEMPT_NUMBER);
       }
-      return BuilderUtils.newApplicationReport(this.applicationId, this.user,
-          this.queue, this.name, host, rpcPort, clientToken,
-          createApplicationState(this.stateMachine.getCurrentState()),
-          diags, trackingUrl,
-          this.startTime, this.finishTime, finishState, appUsageReport,
-          origTrackingUrl);
+      return BuilderUtils.newApplicationReport(this.applicationId,
+          currentApplicationAttemptId, this.user, this.queue,
+          this.name, host, rpcPort, clientToken,
+          createApplicationState(this.stateMachine.getCurrentState()), diags,
+          trackingUrl, this.startTime, this.finishTime, finishState,
+          appUsageReport, origTrackingUrl);
     } finally {
       this.readLock.unlock();
     }
@@ -537,6 +564,14 @@ public class RMAppImpl implements RMApp 
     };
   }
 
+  private static final class RMAppFinishingTransition extends
+      RMAppTransition {
+    @Override
+    public void transition(RMAppImpl app, RMAppEvent event) {
+      app.finishTime = System.currentTimeMillis();
+    }
+  }
+
   private static class AppKilledTransition extends FinalTransition {
     @Override
     public void transition(RMAppImpl app, RMAppEvent event) {
@@ -580,7 +615,9 @@ public class RMAppImpl implements RMApp 
         app.handler.handle(
             new RMNodeCleanAppEvent(nodeId, app.applicationId));
       }
-      app.finishTime = System.currentTimeMillis();
+      if (app.getState() != RMAppState.FINISHING) {
+        app.finishTime = System.currentTimeMillis();
+      }
       app.handler.handle(
           new RMAppManagerEvent(app.applicationId,
           RMAppManagerEventType.APP_COMPLETED));
@@ -599,21 +636,32 @@ public class RMAppImpl implements RMApp 
     @Override
     public RMAppState transition(RMAppImpl app, RMAppEvent event) {
 
-      RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent)event);
-      if (app.attempts.size() == app.maxRetries) {
-        String msg = "Application " + app.getApplicationId()
-        + " failed " + app.maxRetries
-        + " times due to " + failedEvent.getDiagnostics()
-        + ". Failing the application.";
+      RMAppFailedAttemptEvent failedEvent = ((RMAppFailedAttemptEvent) event);
+      boolean retryApp = true;
+      String msg = null;
+      if (app.submissionContext.getUnmanagedAM()) {
+        // RM does not manage the AM. Do not retry
+        retryApp = false;
+        msg = "Unmanaged application " + app.getApplicationId()
+            + " failed due to " + failedEvent.getDiagnostics()
+            + ". Failing the application.";
+      } else if (app.attempts.size() == app.maxRetries) {
+        retryApp = false;
+        msg = "Application " + app.getApplicationId() + " failed "
+            + app.maxRetries + " times due to " + failedEvent.getDiagnostics()
+            + ". Failing the application.";
+      }
+
+      if (retryApp) {
+        app.createNewAttempt();
+        return initialState;
+      } else {
         LOG.info(msg);
         app.diagnostics.append(msg);
         // Inform the node for app-finish
         FINAL_TRANSITION.transition(app, event);
         return RMAppState.FAILED;
       }
-
-      app.createNewAttempt();
-      return initialState;
     }
 
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppState.java Fri Aug  3 19:00:15 2012
@@ -19,5 +19,5 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
 
 public enum RMAppState {
-  NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHED, FAILED, KILLED
+  NEW, SUBMITTED, ACCEPTED, RUNNING, FINISHING, FINISHED, FAILED, KILLED
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Fri Aug  3 19:00:15 2012
@@ -143,16 +143,24 @@ public class RMAppAttemptImpl implements
       .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.KILLED,
           RMAppAttemptEventType.KILL,
           new BaseFinalTransition(RMAppAttemptState.KILLED))
-
+      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FAILED,
+          RMAppAttemptEventType.REGISTERED,
+          new UnexpectedAMRegisteredTransition())
+          
       // Transitions from SUBMITTED state
       .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
           RMAppAttemptEventType.APP_REJECTED, new AppRejectedTransition())
-      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.SCHEDULED,
-          RMAppAttemptEventType.APP_ACCEPTED, new ScheduleTransition())
+      .addTransition(RMAppAttemptState.SUBMITTED, 
+          EnumSet.of(RMAppAttemptState.LAUNCHED, RMAppAttemptState.SCHEDULED),
+          RMAppAttemptEventType.APP_ACCEPTED, 
+          new ScheduleTransition())
       .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.KILLED,
           RMAppAttemptEventType.KILL,
           new BaseFinalTransition(RMAppAttemptState.KILLED))
-
+      .addTransition(RMAppAttemptState.SUBMITTED, RMAppAttemptState.FAILED,
+          RMAppAttemptEventType.REGISTERED,
+          new UnexpectedAMRegisteredTransition())
+          
        // Transitions from SCHEDULED State
       .addTransition(RMAppAttemptState.SCHEDULED,
           RMAppAttemptState.ALLOCATED,
@@ -173,7 +181,7 @@ public class RMAppAttemptImpl implements
           RMAppAttemptEventType.LAUNCH_FAILED, new LaunchFailedTransition())
       .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.KILLED,
           RMAppAttemptEventType.KILL, new KillAllocatedAMTransition())
-
+          
        // Transitions from LAUNCHED State
       .addTransition(RMAppAttemptState.LAUNCHED, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.REGISTERED, new AMRegisteredTransition())
@@ -189,7 +197,8 @@ public class RMAppAttemptImpl implements
           new FinalTransition(RMAppAttemptState.KILLED))
 
        // Transitions from RUNNING State
-      .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.FINISHED,
+      .addTransition(RMAppAttemptState.RUNNING,
+          EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
           RMAppAttemptEventType.UNREGISTERED, new AMUnregisteredTransition())
       .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
           RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
@@ -225,6 +234,21 @@ public class RMAppAttemptImpl implements
               RMAppAttemptEventType.CONTAINER_ALLOCATED,
               RMAppAttemptEventType.CONTAINER_FINISHED))
 
+      // Transitions from FINISHING State
+      .addTransition(RMAppAttemptState.FINISHING,
+          EnumSet.of(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED),
+          RMAppAttemptEventType.CONTAINER_FINISHED,
+          new AMFinishingContainerFinishedTransition())
+      .addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHED,
+          RMAppAttemptEventType.EXPIRE,
+          new FinalTransition(RMAppAttemptState.FINISHED))
+      .addTransition(RMAppAttemptState.FINISHING, RMAppAttemptState.FINISHING,
+          EnumSet.of(
+              RMAppAttemptEventType.UNREGISTERED,
+              RMAppAttemptEventType.STATUS_UPDATE,
+              RMAppAttemptEventType.CONTAINER_ALLOCATED,
+              RMAppAttemptEventType.KILL))
+
       // Transitions from FINISHED State
       .addTransition(
           RMAppAttemptState.FINISHED,
@@ -583,9 +607,11 @@ public class RMAppAttemptImpl implements
   private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST =
     new ArrayList<ResourceRequest>();
 
-  private static final class ScheduleTransition extends BaseTransition {
+  private static final class ScheduleTransition
+      implements
+      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
     @Override
-    public void transition(RMAppAttemptImpl appAttempt,
+    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
 
       // Send the acceptance to the app
@@ -593,17 +619,27 @@ public class RMAppAttemptImpl implements
           .getApplicationAttemptId().getApplicationId(),
           RMAppEventType.APP_ACCEPTED));
 
-      // Request a container for the AM.
-      ResourceRequest request = BuilderUtils.newResourceRequest(
-          AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
-              .getAMContainerSpec().getResource(), 1);
-
-      Allocation amContainerAllocation = 
-          appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
-              Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
-      if (amContainerAllocation != null
-          && amContainerAllocation.getContainers() != null) {
-        assert(amContainerAllocation.getContainers().size() == 0);
+      if (!appAttempt.submissionContext.getUnmanagedAM()) {
+        // Request a container for the AM.
+        ResourceRequest request = BuilderUtils.newResourceRequest(
+            AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
+                .getAMContainerSpec().getResource(), 1);
+
+        Allocation amContainerAllocation = appAttempt.scheduler.allocate(
+            appAttempt.applicationAttemptId,
+            Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST);
+        if (amContainerAllocation != null
+            && amContainerAllocation.getContainers() != null) {
+          assert (amContainerAllocation.getContainers().size() == 0);
+        }
+        return RMAppAttemptState.SCHEDULED;
+      } else {
+        // RM not allocating container. AM is self launched. 
+        // Directly go to LAUNCHED state
+        // Register with AMLivelinessMonitor
+        appAttempt.rmContext.getAMLivelinessMonitor().register(
+            appAttempt.applicationAttemptId);
+        return RMAppAttemptState.LAUNCHED;
       }
     }
   }
@@ -810,11 +846,32 @@ public class RMAppAttemptImpl implements
       // UnRegister from AMLivelinessMonitor
       appAttempt.rmContext.getAMLivelinessMonitor().unregister(
           appAttempt.getAppAttemptId());
+      appAttempt.rmContext.getAMFinishingMonitor().unregister(
+          appAttempt.getAppAttemptId());
 
-      // Tell the launcher to cleanup.
-      appAttempt.eventHandler.handle(new AMLauncherEvent(
-          AMLauncherEventType.CLEANUP, appAttempt));
+      if(!appAttempt.submissionContext.getUnmanagedAM()) {
+        // Tell the launcher to cleanup.
+        appAttempt.eventHandler.handle(new AMLauncherEvent(
+            AMLauncherEventType.CLEANUP, appAttempt));
+      }
+    }
+  }
+  
+  private static class UnexpectedAMRegisteredTransition extends
+      BaseFinalTransition {
+
+    public UnexpectedAMRegisteredTransition() {
+      super(RMAppAttemptState.FAILED);
     }
+
+    @Override
+    public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) {
+      assert appAttempt.submissionContext.getUnmanagedAM();
+      appAttempt
+          .setDiagnostics("Unmanaged AM must register after AM attempt reaches LAUNCHED state.");
+      super.transition(appAttempt, event);
+    }
+
   }
 
   private static final class StatusUpdateTransition extends
@@ -835,15 +892,21 @@ public class RMAppAttemptImpl implements
     }
   }
 
-  private static final class AMUnregisteredTransition extends FinalTransition {
-
-    public AMUnregisteredTransition() {
-      super(RMAppAttemptState.FINISHED);
-    }
+  private static final class AMUnregisteredTransition implements
+      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
 
     @Override
-    public void transition(RMAppAttemptImpl appAttempt,
+    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
+      ApplicationAttemptId appAttemptId = appAttempt.getAppAttemptId();
+
+      appAttempt.rmContext.getAMLivelinessMonitor().unregister(appAttemptId);
+
+      // Remove the AppAttempt from the ApplicationTokenSecretManager
+      appAttempt.rmContext.getApplicationTokenSecretManager()
+        .applicationMasterFinished(appAttemptId);
+
+      appAttempt.progress = 1.0f;
 
       RMAppAttemptUnregistrationEvent unregisterEvent
         = (RMAppAttemptUnregistrationEvent) event;
@@ -853,8 +916,20 @@ public class RMAppAttemptImpl implements
         appAttempt.generateProxyUriWithoutScheme(appAttempt.origTrackingUrl);
       appAttempt.finalStatus = unregisterEvent.getFinalApplicationStatus();
 
-      // Tell the app and the scheduler
-      super.transition(appAttempt, event);
+      // Tell the app
+      if (appAttempt.getSubmissionContext().getUnmanagedAM()) {
+        // Unmanaged AMs have no container to wait for, so they skip
+        // the FINISHING state and go straight to FINISHED.
+        new FinalTransition(RMAppAttemptState.FINISHED).transition(
+            appAttempt, event);
+        return RMAppAttemptState.FINISHED;
+      }
+      appAttempt.rmContext.getAMFinishingMonitor().register(appAttemptId);
+      ApplicationId applicationId =
+          appAttempt.getAppAttemptId().getApplicationId();
+      appAttempt.eventHandler.handle(
+          new RMAppEvent(applicationId, RMAppEventType.ATTEMPT_FINISHING));
+      return RMAppAttemptState.FINISHING;
     }
   }
 
@@ -884,8 +959,11 @@ public class RMAppAttemptImpl implements
 
       // Is this container the AmContainer? If the finished container is same as
       // the AMContainer, AppAttempt fails
-      if (appAttempt.masterContainer.getId().equals(
-          containerStatus.getContainerId())) {
+      if (appAttempt.masterContainer != null
+          && appAttempt.masterContainer.getId().equals(
+              containerStatus.getContainerId())) {
+        // container associated with AM. must not be unmanaged 
+        assert appAttempt.submissionContext.getUnmanagedAM() == false;
         // Setup diagnostic message
         appAttempt.diagnostics.append("AM Container for " +
             appAttempt.getAppAttemptId() + " exited with " +
@@ -916,6 +994,33 @@ public class RMAppAttemptImpl implements
     }
   }
 
+  private static final class AMFinishingContainerFinishedTransition
+      implements
+      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
+
+    @Override
+    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
+        RMAppAttemptEvent event) {
+
+      RMAppAttemptContainerFinishedEvent containerFinishedEvent
+        = (RMAppAttemptContainerFinishedEvent) event;
+      ContainerStatus containerStatus =
+          containerFinishedEvent.getContainerStatus();
+
+      // Is this container the ApplicationMaster container?
+      if (appAttempt.masterContainer.getId().equals(
+          containerStatus.getContainerId())) {
+        new FinalTransition(RMAppAttemptState.FINISHED).transition(
+            appAttempt, containerFinishedEvent);
+        return RMAppAttemptState.FINISHED;
+      }
+
+      // Normal container.
+      appAttempt.justFinishedContainers.add(containerStatus);
+      return RMAppAttemptState.FINISHING;
+    }
+  }
+
   @Override
   public long getStartTime() {
     this.readLock.lock();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Fri Aug  3 19:00:15 2012
@@ -19,6 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
 
 public enum RMAppAttemptState {
-  NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHED,
-  KILLED,
+  NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING,
+  FINISHING, FINISHED, KILLED,
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/ContainerAllocationExpirer.java Fri Aug  3 19:00:15 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.event.Even
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
 
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class ContainerAllocationExpirer extends
     AbstractLivelinessMonitor<ContainerId> {
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1369164&r1=1369163&r2=1369164&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java Fri Aug  3 19:00:15 2012
@@ -26,6 +26,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 
+/**
+ * Represents the ResourceManager's view of an application container. See 
+ * {@link RMContainerImpl} for an implementation. Containers may be in one
+ * of several states, given in {@link RMContainerState}. An RMContainer
+ * instance may exist even if there is no actual running container, such as 
+ * when resources are being reserved to fill space for a future container 
+ * allocation.
+ */
 public interface RMContainer extends EventHandler<RMContainerEvent> {
 
   ContainerId getContainerId();



Mime
View raw message