hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1099337 [1/3] - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ mr-client/hadoop-mapreduce-client-shuffle/src/...
Date Wed, 04 May 2011 06:53:53 GMT
Author: vinodkv
Date: Wed May  4 06:53:52 2011
New Revision: 1099337

URL: http://svn.apache.org/viewvc?rev=1099337&view=rev
Log:
Adding user log handling for YARN. Making NM put the user-logs on DFS and providing log-dump tools. Contributed by Vinod Kumar Vavilapalli.

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/ContainerLogsRetentionPolicy.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogDumper.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppFinishedEvent.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorAppStartedEvent.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorContainerFinishedEvent.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEvent.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/event/LogAggregatorEventType.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
Removed:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Container.java
Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    hadoop/mapreduce/branches/MR-279/yarn/bin/yarn
    hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/resources/container-log4j.properties
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/NodeManagerImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestApplicationCleanup.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Wed May  4 06:53:52 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
 
   MAPREDUCE-279
 
+    Adding user log handling for YARN. Making NM put the user-logs on DFS and providing log-dump tools. (vinodkv)
+
     MAPREDUCE-2468. Add metrics for NM Shuffle. (Luke Lu via cdouglas)
 
     Completing RM Restart. Completed Phase 3 of making sure events are logged and restored (mahadev)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/pom.xml Wed May  4 06:53:52 2011
@@ -43,6 +43,13 @@
       <artifactId>yarn-server-resourcemanager</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+      <version>1.0-SNAPSHOT</version>
+      <type>jar</type>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Wed May  4 06:53:52 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.MapReduceChildJVM;
+import org.apache.hadoop.mapred.ShuffleHandler;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -512,9 +513,11 @@ public abstract class TaskAttemptImpl im
       LOG.info("Putting shuffle token in serviceData");
       DataOutputBuffer jobToken_dob = new DataOutputBuffer();
       jobToken.write(jobToken_dob);
-      // TODO: should depend on ShuffleHandler
-      container.setServiceData("mapreduce.shuffle", 
-          ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()));
+      container
+          .setServiceData(
+              ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID,
+              ByteBuffer.wrap(jobToken_dob.getData(), 0,
+                  jobToken_dob.getLength()));
 
       Map<String, String> env = new HashMap<String, String>();
       MRApps.setInitialClasspath(env);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Wed May  4 06:53:52 2011
@@ -18,25 +18,61 @@
 
 package org.apache.hadoop.mapred;
 
+import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.net.URL;
 import java.net.InetSocketAddress;
+import java.net.URL;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.concurrent.Executors;
-import java.util.Map;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 import javax.crypto.SecretKey;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
-
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterInt;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -67,39 +103,6 @@ import org.jboss.netty.handler.codec.htt
 import org.jboss.netty.handler.stream.ChunkedWriteHandler;
 import org.jboss.netty.util.CharsetUtil;
 
-import static org.jboss.netty.buffer.ChannelBuffers.*;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.*;
-import static org.jboss.netty.handler.codec.http.HttpMethod.*;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.*;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterInt;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-//
-// TODO packaging
 public class ShuffleHandler extends AbstractService 
     implements AuxServices.AuxiliaryService {
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/bin/yarn
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/bin/yarn?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/bin/yarn (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/bin/yarn Wed May  4 06:53:52 2011
@@ -67,6 +67,7 @@ if [ $# = 0 ]; then
   echo "  rmadmin              admin tools" 
   echo "  version              print the version"
   echo "  jar <jar>            run a jar file"
+  echo "  logs                 dump container logs"
   echo "  classpath            prints the class path needed to get the"
   echo "                       Hadoop jar and the required libraries"
   echo "  daemonlog            get/set the log level for each daemon"
@@ -304,6 +305,9 @@ elif [ "$COMMAND" = "version" ] ; then
 elif [ "$COMMAND" = "jar" ] ; then
   CLASS=org.apache.hadoop.util.RunJar
   YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
+elif [ "$COMMAND" = "logs" ] ; then
+  CLASS=org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogDumper
+  YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"
 elif [ "$COMMAND" = "daemonlog" ] ; then
   CLASS=org.apache.hadoop.log.LogLevel
   YARN_OPTS="$YARN_OPTS $YARN_CLIENT_OPTS"

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java Wed May  4 06:53:52 2011
@@ -27,6 +27,15 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 /**
  * Builder utilities to construct various objects.
@@ -34,20 +43,32 @@ import org.apache.hadoop.yarn.factories.
  */
 public class BuilderUtils {
 
-  public static class ApplicationIdComparator implements
-      Comparator<ApplicationId> {  
+  private static final RecordFactory recordFactory = RecordFactoryProvider
+      .getRecordFactory(null);
 
+  public static class ApplicationIdComparator implements
+      Comparator<ApplicationId> {
     @Override
     public int compare(ApplicationId a1, ApplicationId a2) {
       return a1.compareTo(a2);
     }
-    
+  }
+
+  public static class ContainerComparator implements
+      java.util.Comparator<Container> {
+
+    @Override
+    public int compare(Container c1,
+        Container c2) {
+      return c1.compareTo(c2);
+    }
   }
 
   public static LocalResource newLocalResource(RecordFactory recordFactory,
       URI uri, LocalResourceType type, LocalResourceVisibility visibility,
       long size, long timestamp) {
-    LocalResource resource = recordFactory.newRecordInstance(LocalResource.class);
+    LocalResource resource =
+        recordFactory.newRecordInstance(LocalResource.class);
     resource.setResource(ConverterUtils.getYarnUrlFromURI(uri));
     resource.setType(type);
     resource.setVisibility(visibility);
@@ -74,12 +95,61 @@ public class BuilderUtils {
     return applicationId;
   }
 
+  public static ApplicationId newApplicationId(long clusterTimeStamp, int id) {
+    ApplicationId applicationId =
+        recordFactory.newRecordInstance(ApplicationId.class);
+    applicationId.setId(id);
+    applicationId.setClusterTimestamp(clusterTimeStamp);
+    return applicationId;
+  }
+
+  public static ApplicationId convert(long clustertimestamp, CharSequence id) {
+    ApplicationId applicationId =
+        recordFactory.newRecordInstance(ApplicationId.class);
+    applicationId.setId(Integer.valueOf(id.toString()));
+    applicationId.setClusterTimestamp(clustertimestamp);
+    return applicationId;
+  }
+
   public static ContainerId newContainerId(RecordFactory recordFactory,
-      ApplicationId applicationId, int containerId) {
+      ApplicationId applicationId,
+      int containerId) {
     ContainerId id = recordFactory.newRecordInstance(ContainerId.class);
     id.setAppId(applicationId);
     id.setId(containerId);
     return id;
   }
 
+  public static Container clone(Container c) {
+    Container container = recordFactory.newRecordInstance(Container.class);
+    container.setId(c.getId());
+    container.setContainerToken(c.getContainerToken());
+    container.setContainerManagerAddress(c.getContainerManagerAddress());
+    container.setNodeHttpAddress(c.getNodeHttpAddress());
+    container.setResource(c.getResource());
+    container.setState(c.getState());
+    return container;
+  }
+
+  public static Container newContainer(RecordFactory recordFactory,
+      ApplicationId applicationId, int containerId,
+      String containerManagerAddress, String nodeHttpAddress,
+      Resource resource) {
+    ContainerId containerID =
+        newContainerId(recordFactory, applicationId, containerId);
+    return newContainer(containerID, containerManagerAddress,
+        nodeHttpAddress, resource);
+  }
+
+  public static Container newContainer(ContainerId containerId,
+      String containerManagerAddress, String nodeHttpAddress,
+      Resource resource) {
+    Container container = recordFactory.newRecordInstance(Container.class);
+    container.setId(containerId);
+    container.setContainerManagerAddress(containerManagerAddress);
+    container.setNodeHttpAddress(nodeHttpAddress);
+    container.setResource(resource);
+    container.setState(ContainerState.INITIALIZING);
+    return container;
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NMConfig.java Wed May  4 06:53:52 2011
@@ -55,6 +55,11 @@ public class NMConfig {
 
   public static final String DEFAULT_NM_LOG_DIR = "/tmp/logs";
 
+  public static final String REMOTE_USER_LOG_DIR = NM_PREFIX
+      + "remote-app-log-dir";
+
+  public static final String DEFAULT_REMOTE_APP_LOG_DIR = "/tmp/logs";
+
   public static final int DEFAULT_NM_VMEM_GB = 8;
 
   public static final String NM_VMEM_GB = NM_PREFIX + "resource.memory.gb";

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Wed May  4 06:53:52 2011
@@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
 import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed May  4 06:53:52 2011
@@ -146,11 +146,12 @@ public class NodeStatusUpdaterImpl exten
   protected ResourceTracker getRMClient() {
     YarnRPC rpc = YarnRPC.create(getConfig());
     InetSocketAddress rmAddress = NetUtils.createSocketAddr(this.rmAddress);
-    getConfig().setClass(
+    Configuration rmClientConf = new Configuration(getConfig());
+    rmClientConf.setClass(
         CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
         RMNMSecurityInfoClass.class, SecurityInfo.class);
     return (ResourceTracker) rpc.getProxy(ResourceTracker.class, rmAddress,
-        getConfig());
+        rmClientConf);
   }
 
   private void registerWithRM() throws YarnRemoteException {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Wed May  4 06:53:52 2011
@@ -18,12 +18,10 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 
-import java.lang.reflect.Constructor;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -31,9 +29,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.service.FilterService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed May  4 06:53:52 2011
@@ -22,7 +22,9 @@ import static org.apache.hadoop.yarn.ser
 import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_BIND_ADDRESS;
 import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.avro.ipc.Server;
@@ -30,9 +32,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.CleanupContainerResponse;
@@ -75,6 +81,8 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -92,7 +100,7 @@ public class ContainerManagerImpl extend
   final Context context;
   private final ContainersMonitor containersMonitor;
   private Server server;
-  private InetSocketAddress cmAddr;
+  private InetSocketAddress cmBindAddressStr;
   private final ResourceLocalizationService rsrcLocalizationSrvc;
   private final ContainersLauncher containersLauncher;
   private final AuxServices auxiluaryServices;
@@ -137,6 +145,10 @@ public class ContainerManagerImpl extend
         new ContainersMonitorImpl(exec, dispatcher);
     addService(this.containersMonitor);
 
+    LogAggregationService logAggregationService =
+        createLogAggregationService(this.deletionService);
+    addService(logAggregationService);
+
     dispatcher.register(ContainerEventType.class,
         new ContainerEventDispatcher());
     dispatcher.register(ApplicationEventType.class,
@@ -145,9 +157,15 @@ public class ContainerManagerImpl extend
     dispatcher.register(AuxServicesEventType.class, auxiluaryServices);
     dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
     dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
+    dispatcher.register(LogAggregatorEventType.class, logAggregationService);
     addService(dispatcher);
   }
 
+  protected LogAggregationService createLogAggregationService(
+      DeletionService deletionService) {
+    return new LogAggregationService(deletionService);
+  }
+
   public ContainersMonitor getContainersMonitor() {
     return this.containersMonitor;
   }
@@ -165,12 +183,9 @@ public class ContainerManagerImpl extend
 
   @Override
   public void init(Configuration conf) {
-    cmAddr = NetUtils.createSocketAddr(
+    cmBindAddressStr = NetUtils.createSocketAddr(
         conf.get(NM_BIND_ADDRESS, DEFAULT_NM_BIND_ADDRESS));
-    Configuration cmConf = new Configuration(conf);
-    cmConf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
-        ContainerManagerSecurityInfo.class, SecurityInfo.class);
-    super.init(cmConf);
+    super.init(conf);
   }
 
   @Override
@@ -187,10 +202,13 @@ public class ContainerManagerImpl extend
           this.nodeStatusUpdater.getContainerManagerBindAddress(),
           this.nodeStatusUpdater.getRMNMSharedSecret());
     }
+    Configuration cmConf = new Configuration(getConfig());
+    cmConf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        ContainerManagerSecurityInfo.class, SecurityInfo.class);
     server =
-        rpc.getServer(ContainerManager.class, this, cmAddr, getConfig(),
+        rpc.getServer(ContainerManager.class, this, cmBindAddressStr, cmConf,
             this.containerTokenSecretManager);
-    LOG.info("ContainerManager started at " + cmAddr);
+    LOG.info("ContainerManager started at " + cmBindAddressStr);
     server.start();
     super.start();
   }
@@ -217,10 +235,29 @@ public class ContainerManagerImpl extend
   public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException {
     ContainerLaunchContext launchContext = request.getContainerLaunchContext();
   
-    Container container = new ContainerImpl(this.dispatcher, launchContext);
-    //ContainerID containerID = launchContext.id;
+    // parse credentials
+    ByteBuffer tokens = launchContext.getContainerTokens();
+    Credentials credentials = new Credentials();
+    if (tokens != null) {
+      DataInputByteBuffer buf = new DataInputByteBuffer();
+      tokens.rewind();
+      buf.reset(tokens);
+      try {
+        credentials.readTokenStorageStream(buf);
+        if (LOG.isDebugEnabled()) {
+          for (Token<? extends TokenIdentifier> tk : credentials
+              .getAllTokens()) {
+            LOG.debug(tk.getService() + " = " + tk.toString());
+          }
+        }
+      } catch (IOException e) {
+        throw RPCUtil.getRemoteException(e);
+      }
+    }
+
+    Container container =
+        new ContainerImpl(this.dispatcher, launchContext, credentials);
     ContainerId containerID = launchContext.getContainerId();
-    //ApplicationID applicationID = containerID.appID;
     ApplicationId applicationID = containerID.getAppId();
     if (context.getContainers().putIfAbsent(containerID, container) != null) {
       throw RPCUtil.getRemoteException("Container " + containerID
@@ -229,7 +266,7 @@ public class ContainerManagerImpl extend
 
     // Create the application
     Application application = new ApplicationImpl(dispatcher,
-        launchContext.getUser(), applicationID);
+        launchContext.getUser(), applicationID, credentials);
     if (null ==
         context.getApplications().putIfAbsent(applicationID, application)) {
       LOG.info("Creating a new application reference for app "

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Wed May  4 06:53:52 2011
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -35,6 +36,9 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.ContainerLogsRetentionPolicy;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
@@ -47,7 +51,7 @@ public class ApplicationImpl implements 
   final Dispatcher dispatcher;
   final String user;
   final ApplicationId appId;
-  Path logDir;
+  final Credentials credentials;
 
   private static final Log LOG = LogFactory.getLog(Application.class);
 
@@ -55,10 +59,11 @@ public class ApplicationImpl implements 
       new HashMap<ContainerId, Container>();
 
   public ApplicationImpl(Dispatcher dispatcher, String user,
-      ApplicationId appId) {
+      ApplicationId appId, Credentials credentials) {
     this.dispatcher = dispatcher;
     this.user = user.toString();
     this.appId = appId;
+    this.credentials = credentials;
     stateMachine = stateMachineFactory.make(this);
   }
 
@@ -136,7 +141,8 @@ public class ApplicationImpl implements 
            // Transitions from APPLICATION_RESOURCES_CLEANINGUP state
            .addTransition(ApplicationState.APPLICATION_RESOURCES_CLEANINGUP,
                ApplicationState.FINISHED,
-               ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)
+               ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP,
+               new AppCompletelyDoneTransition())
 
            // create the topology tables
            .installTopology();
@@ -178,12 +184,18 @@ public class ApplicationImpl implements 
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override
     public void transition(ApplicationImpl app, ApplicationEvent event) {
-      // Start all the containers waiting for ApplicationInit
+
       ApplicationInitedEvent initedEvent = (ApplicationInitedEvent) event;
-      app.logDir = initedEvent.getLogDir();
+      // Inform the logAggregator
+      app.dispatcher.getEventHandler().handle(
+            new LogAggregatorAppStartedEvent(app.appId, app.user,
+                app.credentials,
+                ContainerLogsRetentionPolicy.ALL_CONTAINERS)); // TODO: Fix
+
+      // Start all the containers waiting for ApplicationInit
       for (Container container : app.containers.values()) {
         app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
-              container.getContainerID(), app.logDir));
+              container.getContainerID()));
       }
     }
   }
@@ -198,7 +210,7 @@ public class ApplicationImpl implements 
       LOG.info("Adding " + container.getContainerID()
           + " to application " + app.toString());
       app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
-            container.getContainerID(), app.logDir));
+            container.getContainerID()));
     }
   }
 
@@ -278,6 +290,16 @@ public class ApplicationImpl implements 
 
   }
 
+  static class AppCompletelyDoneTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+      // Inform the logService
+      app.dispatcher.getEventHandler().handle(
+          new LogAggregatorAppFinishedEvent(app.appId));
+    }
+  }
+
   @Override
   public synchronized void handle(ApplicationEvent event) {
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java Wed May  4 06:53:52 2011
@@ -17,20 +17,12 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 
 public class ApplicationInitedEvent extends ApplicationEvent {
 
-  private final Path logDir;
-
-  public ApplicationInitedEvent(ApplicationId appID, Path logDir) {
+  public ApplicationInitedEvent(ApplicationId appID) {
     super(appID, ApplicationEventType.APPLICATION_INITED);
-    this.logDir = logDir;
-  }
-
-  public Path getLogDir() {
-    return logDir;
   }
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Wed May  4 06:53:52 2011
@@ -18,10 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
-import java.io.IOException;
-
 import java.net.URISyntaxException;
-
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.EnumSet;
@@ -31,16 +28,14 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -50,8 +45,9 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalResourceRequest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationRequestEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
@@ -66,7 +62,7 @@ public class ContainerImpl implements Co
   private final Dispatcher dispatcher;
   private final Credentials credentials;
   private final ContainerLaunchContext launchContext;
-  private int exitCode;
+  private String exitCode = "NA";
   private final StringBuilder diagnostics;
 
   private static final Log LOG = LogFactory.getLog(Container.class);
@@ -77,11 +73,12 @@ public class ContainerImpl implements Co
     new HashMap<Path,String>();
 
   public ContainerImpl(Dispatcher dispatcher,
-      ContainerLaunchContext launchContext) {
+      ContainerLaunchContext launchContext, Credentials creds) {
     this.dispatcher = dispatcher;
     this.launchContext = launchContext;
     this.diagnostics = new StringBuilder();
-    this.credentials = new Credentials();
+    this.credentials = creds;
+
     stateMachine = stateMachineFactory.make(this);
   }
 
@@ -304,29 +301,6 @@ public class ContainerImpl implements Co
         ContainerEvent event) {
       final ContainerLaunchContext ctxt = container.getLaunchContext();
 
-      // parse credentials
-      ByteBuffer creds = ctxt.getContainerTokens();
-      if (creds != null) {
-        try {
-          DataInputByteBuffer buf = new DataInputByteBuffer();
-          creds.rewind();
-          buf.reset(creds);
-          container.credentials.readTokenStorageStream(buf);
-          if (LOG.isDebugEnabled()) {
-            for (Token<? extends TokenIdentifier> tk :
-                 container.credentials.getAllTokens()) {
-              LOG.debug(tk.getService() + " = " + tk.toString());
-            }
-          }
-        } catch (IOException e) {
-          // invalid credentials
-          container.dispatcher.getEventHandler().handle(
-              new ContainerLocalizationEvent(
-                LocalizationEventType.CLEANUP_CONTAINER_RESOURCES, container));
-          return ContainerState.LOCALIZING;
-        }
-      }
-
       // Inform the AuxServices about the opaque serviceData
       Map<String,ByteBuffer> csd = ctxt.getAllServiceData();
       if (csd != null) {
@@ -458,7 +432,7 @@ public class ContainerImpl implements Co
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerExitEvent exitEvent = (ContainerExitEvent) event;
-      container.exitCode = exitEvent.getExitCode();
+      container.exitCode = String.valueOf(exitEvent.getExitCode());
 
       // TODO: Add containerWorkDir to the deletion service.
       // TODO: Add containerOuputDir to the deletion service.
@@ -503,7 +477,7 @@ public class ContainerImpl implements Co
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       ContainerExitEvent exitEvent = (ContainerExitEvent) event;
-      container.exitCode = exitEvent.getExitCode();
+      container.exitCode = String.valueOf(exitEvent.getExitCode());
 
       // The process/process-grp is killed. Decrement reference counts and
       // cleanup resources
@@ -519,11 +493,14 @@ public class ContainerImpl implements Co
     @Override
     public void transition(ContainerImpl container, ContainerEvent event) {
       // Inform the application
-      container.dispatcher.getEventHandler().handle(
-          new ApplicationContainerFinishedEvent(container.getContainerID()));
+      ContainerId containerID = container.getContainerID();
+      EventHandler eventHandler = container.dispatcher.getEventHandler();
+      eventHandler.handle(new ApplicationContainerFinishedEvent(containerID));
       // Remove the container from the resource-monitor
-      container.dispatcher.getEventHandler().handle(
-          new ContainerStopMonitoringEvent(container.getContainerID()));
+      eventHandler.handle(new ContainerStopMonitoringEvent(containerID));
+      // Tell the logService too
+      eventHandler.handle(new LogAggregatorContainerFinishedEvent(
+          containerID, container.exitCode));
     }
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerInitEvent.java Wed May  4 06:53:52 2011
@@ -17,20 +17,12 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 
 public class ContainerInitEvent extends ContainerEvent {
 
-  private final Path logDir;
-
-  public ContainerInitEvent(ContainerId c, Path logDir) {
+  public ContainerInitEvent(ContainerId c) {
     super(c, ContainerEventType.INIT_CONTAINER);
-    this.logDir = logDir;
-  }
-
-  public Path getLogDir() {
-    return logDir;
   }
 
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1099337&r1=1099336&r2=1099337&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed May  4 06:53:52 2011
@@ -237,7 +237,7 @@ public class ResourceLocalizationService
       }
       // 1) Signal container init
       dispatcher.getEventHandler().handle(new ApplicationInitedEvent(
-            app.getAppId(), logDirs.get(0)));
+            app.getAppId()));
       break;
     case INIT_CONTAINER_RESOURCES:
       ContainerLocalizationRequestEvent rsrcReqs =

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AggregatedLogFormat.java Wed May  4 06:53:52 2011
@@ -0,0 +1,247 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.EnumSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.file.tfile.TFile;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class AggregatedLogFormat {
+
+  static final Log LOG = LogFactory.getLog(AggregatedLogFormat.class);
+
+  public static class LogKey implements Writable {
+
+    private String containerId;
+
+    public LogKey() {
+
+    }
+
+    public LogKey(ContainerId containerId) {
+      this.containerId = ConverterUtils.toString(containerId);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeUTF(this.containerId);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      this.containerId = in.readUTF();
+    }
+
+    @Override
+    public String toString() {
+      return this.containerId;
+    }
+  }
+
+  public static class LogValue {
+
+    private final File containerLogDir;
+
+    public LogValue(File containerLogDir) {
+      this.containerLogDir = containerLogDir;
+    }
+
+    public void write(DataOutputStream out) throws IOException {
+      if (!this.containerLogDir.isDirectory()) {
+        return; // ContainerDir may have been deleted by the user.
+      }
+
+      for (File logFile : this.containerLogDir.listFiles()) {
+
+        // Write the logFile Type
+        out.writeUTF(logFile.getName());
+
+        // Write the log length as UTF so that it is printable
+        out.writeUTF(String.valueOf(logFile.length()));
+
+        // Write the log itself
+        FileInputStream in = new FileInputStream(logFile);
+        byte[] buf = new byte[65535];
+        int len = 0;
+        while ((len = in.read(buf)) != -1) {
+          out.write(buf, 0, len);
+        }
+      }
+    }
+  }
+
+  public static class LogWriter {
+
+    private final FSDataOutputStream fsDataOStream;
+    private final TFile.Writer writer;
+
+    public LogWriter(final Configuration conf, final Path remoteAppLogFile,
+        UserGroupInformation userUgi) throws IOException {
+      try {
+        this.fsDataOStream =
+            userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
+              @Override
+              public FSDataOutputStream run() throws Exception {
+                return FileContext.getFileContext(conf).create(
+                    remoteAppLogFile,
+                    EnumSet.of(CreateFlag.CREATE), new Options.CreateOpts[] {});
+              }
+            });
+      } catch (InterruptedException e) {
+        throw new IOException(e);
+      }
+
+      // Keys are not sorted: null arg
+      // 256KB minBlockSize : Expected log size for each container too
+      this.writer =
+          new TFile.Writer(this.fsDataOStream, 256 * 1024, conf.get(
+              LogAggregationService.LOG_COMPRESSION_TYPE,
+              LogAggregationService.DEFAULT_COMPRESSION_TYPE), null, conf);
+    }
+
+    public void append(LogKey logKey, LogValue logValue) throws IOException {
+      DataOutputStream out = this.writer.prepareAppendKey(-1);
+      logKey.write(out);
+      out.close();
+      out = this.writer.prepareAppendValue(-1);
+      logValue.write(out);
+      out.close();
+      this.fsDataOStream.hflush();
+    }
+
+    public void closeWriter() {
+      try {
+        this.writer.close();
+      } catch (IOException e) {
+        LOG.warn("Exception closing writer", e);
+      }
+      try {
+        this.fsDataOStream.close();
+      } catch (IOException e) {
+        LOG.warn("Exception closing output-stream", e);
+      }
+    }
+  }
+
+  public static class LogReader {
+
+    private final FSDataInputStream fsDataIStream;
+    private final TFile.Reader.Scanner scanner;
+
+    public LogReader(Configuration conf, Path remoteAppLogFile)
+        throws IOException {
+      FileContext fileContext = FileContext.getFileContext(conf);
+      this.fsDataIStream = fileContext.open(remoteAppLogFile);
+      TFile.Reader reader =
+          new TFile.Reader(this.fsDataIStream, fileContext.getFileStatus(
+              remoteAppLogFile).getLen(), conf);
+      this.scanner = reader.createScanner();
+    }
+
+    private boolean atBeginning = true;
+
+    /**
+     * Read the next key and return the value-stream.
+     * 
+     * @param key
+     * @return the valueStream if there are more keys or null otherwise.
+     * @throws IOException
+     */
+    public DataInputStream next(LogKey key) throws IOException {
+      if (!this.atBeginning) {
+        this.scanner.advance();
+      } else {
+        this.atBeginning = false;
+      }
+      if (this.scanner.atEnd()) {
+        return null;
+      }
+      TFile.Reader.Scanner.Entry entry = this.scanner.entry();
+      key.readFields(entry.getKeyStream());
+      DataInputStream valueStream = entry.getValueStream();
+      return valueStream;
+    }
+
+    /**
+     * Keep calling this till you get a {@link EOFException} for getting logs of
+     * all types for a single container.
+     * 
+     * @param valueStream
+     * @param out
+     * @throws IOException
+     */
+    public static void readAContainerLogsForALogType(
+        DataInputStream valueStream, DataOutputStream out)
+          throws IOException {
+
+      byte[] buf = new byte[65535];
+
+      String fileType = valueStream.readUTF();
+      String fileLengthStr = valueStream.readUTF();
+      long fileLength = Long.parseLong(fileLengthStr);
+      out.writeUTF("\nLogType:");
+      out.writeUTF(fileType);
+      out.writeUTF("\nLogLength:");
+      out.writeUTF(fileLengthStr);
+      out.writeUTF("\nLog Contents:\n");
+
+      int curRead = 0;
+      long pendingRead = fileLength - curRead;
+      int toRead =
+                pendingRead > buf.length ? buf.length : (int) pendingRead;
+      int len = valueStream.read(buf, 0, toRead);
+      while (len != -1 && curRead < fileLength) {
+        out.write(buf, 0, len);
+        curRead += len;
+
+        pendingRead = fileLength - curRead;
+        toRead =
+                  pendingRead > buf.length ? buf.length : (int) pendingRead;
+        len = valueStream.read(buf, 0, toRead);
+      }
+    }
+
+    public void close() throws IOException {
+      this.scanner.close();
+      this.fsDataIStream.close();
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Wed May  4 06:53:52 2011
@@ -0,0 +1,32 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public interface AppLogAggregator extends Runnable {
+
+  void startContainerLogAggregation(ContainerId containerId,
+      boolean wasContainerSuccessful);
+
+  void finishLogAggregation();
+
+  void join();
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Wed May  4 06:53:52 2011
@@ -0,0 +1,213 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class AppLogAggregatorImpl implements AppLogAggregator {
+
+  private static final Log LOG = LogFactory
+      .getLog(AppLogAggregatorImpl.class);
+  private static final int THREAD_SLEEP_TIME = 1000;
+
+  private final ApplicationId applicationId;
+  private boolean logAggregationDisabled = false;
+  private final Configuration conf;
+  private final DeletionService delService;
+  private final UserGroupInformation userUgi;
+  private final File localAppLogDir;
+  private final Path remoteNodeLogFileForApp;
+  private final ContainerLogsRetentionPolicy retentionPolicy;
+
+  private final BlockingQueue<ContainerId> pendingContainers;
+  private final AtomicBoolean appFinishing = new AtomicBoolean();
+  private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
+
+  private LogWriter writer = null;
+
+  public AppLogAggregatorImpl(DeletionService deletionService,
+      Configuration conf, ApplicationId appId, UserGroupInformation userUgi,
+      File localAppLogDir, Path remoteNodeLogFileForApp,
+      ContainerLogsRetentionPolicy retentionPolicy) {
+    this.conf = conf;
+    this.delService = deletionService;
+    this.applicationId = appId;
+    this.userUgi = userUgi;
+    this.localAppLogDir = localAppLogDir;
+    this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+    this.retentionPolicy = retentionPolicy;
+    this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
+  }
+
+  private File getLocalContainerLogDir(ContainerId containerId) {
+    return new File(this.localAppLogDir, ConverterUtils.toString(containerId));
+  }
+
+  private void uploadLogsForContainer(ContainerId containerId) {
+
+    if (this.logAggregationDisabled) {
+      return;
+    }
+
+    // Lazy creation of the writer
+    if (this.writer == null) {
+      LOG.info("Starting aggregate log-file for app " + this.applicationId);
+      try {
+        this.writer =
+            new LogWriter(this.conf, this.remoteNodeLogFileForApp,
+                this.userUgi);
+      } catch (IOException e) {
+        LOG.error("Cannot create writer for app " + this.applicationId
+            + ". Disabling log-aggregation for this app.", e);
+        this.logAggregationDisabled = true;
+        return;
+      }
+    }
+
+    File containerLogDir = getLocalContainerLogDir(containerId);
+    LOG.info("Uploading logs for container " + containerId + " from "
+        + containerLogDir);
+    LogKey logKey = new LogKey(containerId);
+    LogValue logValue = new LogValue(containerLogDir);
+    try {
+      this.writer.append(logKey, logValue);
+    } catch (IOException e) {
+      LOG.error("Couldn't upload logs for " + containerId
+          + ". Skipping this container.");
+    }
+  }
+
+  @Override
+  public void run() {
+
+    ContainerId containerId;
+
+    while (!this.appFinishing.get()) {
+      try {
+        containerId = this.pendingContainers.poll();
+        if (containerId == null) {
+          Thread.sleep(THREAD_SLEEP_TIME);
+        } else {
+          uploadLogsForContainer(containerId);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("PendingContainers queue is interrupted");
+      }
+    }
+
+    // Application is finished. Finish pending-containers
+    while ((containerId = this.pendingContainers.poll()) != null) {
+      uploadLogsForContainer(containerId);
+    }
+
+    // Remove the local app-log-dir
+    this.delService.delete(this.userUgi.getShortUserName(), new Path(
+        this.localAppLogDir.getAbsolutePath()), new Path[] {});
+
+    if (this.writer != null) {
+      this.writer.closeWriter();
+      LOG.info("Finished aggregate log-file for app " + this.applicationId);
+    }
+
+    this.appAggregationFinished.set(true);
+  }
+
+  private boolean shouldUploadLogs(ContainerId containerId,
+      boolean wasContainerSuccessful) {
+
+    // All containers
+    if (this.retentionPolicy
+        .equals(ContainerLogsRetentionPolicy.ALL_CONTAINERS)) {
+      return true;
+    }
+
+    // AM Container only
+    if (this.retentionPolicy
+        .equals(ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY)) {
+      if (containerId.getId() == 1) {
+        return true;
+      }
+      return false;
+    }
+
+    // AM + Failing containers
+    if (this.retentionPolicy
+        .equals(ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY)) {
+      if (containerId.getId() == 1) {
+        return true;
+      } else if(!wasContainerSuccessful) {
+        return true;
+      }
+      return false;
+    }
+    return false;
+  }
+
+  @Override
+  public void startContainerLogAggregation(ContainerId containerId,
+      boolean wasContainerSuccessful) {
+    if (shouldUploadLogs(containerId, wasContainerSuccessful)) {
+      LOG.info("Considering container " + containerId
+          + " for log-aggregation");
+      this.pendingContainers.add(containerId);
+    }
+  }
+
+  @Override
+  public void finishLogAggregation() {
+    LOG.info("Application just finished : " + this.applicationId);
+    this.appFinishing.set(true);
+  }
+
+  @Override
+  public void join() {
+    // Aggregation service is finishing
+    this.finishLogAggregation();
+
+    while (!this.appAggregationFinished.get()) {
+      LOG.info("Waiting for aggregation to complete for "
+          + this.applicationId);
+      try {
+        Thread.sleep(THREAD_SLEEP_TIME);
+      } catch (InterruptedException e) {
+        LOG.warn("Join interrupted. Some logs may not have been aggregated!!");
+        break;
+      }
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/ContainerLogsRetentionPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/ContainerLogsRetentionPolicy.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/ContainerLogsRetentionPolicy.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/ContainerLogsRetentionPolicy.java Wed May  4 06:53:52 2011
@@ -0,0 +1,5 @@
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+public enum ContainerLogsRetentionPolicy {
+  APPLICATION_MASTER_ONLY, AM_AND_FAILED_CONTAINERS_ONLY, ALL_CONTAINERS 
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1099337&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Wed May  4 06:53:52 2011
@@ -0,0 +1,232 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_BIND_ADDRESS;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_BIND_ADDRESS;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NMConfig;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorAppStartedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.event.LogAggregatorEvent;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class LogAggregationService extends AbstractService implements
+    EventHandler<LogAggregatorEvent> {
+
+  private static final Log LOG = LogFactory
+      .getLog(LogAggregationService.class);
+
+  private final DeletionService deletionService;
+
+  private File localRootLogDir;
+  Path remoteRootLogDir;
+  private String nodeFile;
+
+  static final String LOG_COMPRESSION_TYPE = NMConfig.NM_PREFIX
+      + "logaggregation.log_compression_type";
+  static final String DEFAULT_COMPRESSION_TYPE = "none";
+
+  private static final String LOG_RENTENTION_POLICY_CONFIG_KEY =
+      NMConfig.NM_PREFIX + "logaggregation.retain-policy";
+
+  private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
+
+  private final ExecutorService threadPool;
+
+  public LogAggregationService(DeletionService deletionService) {
+    super(LogAggregationService.class.getName());
+    this.deletionService = deletionService;
+    this.appLogAggregators =
+        new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
+    this.threadPool = Executors.newCachedThreadPool();
+  }
+
+  public synchronized void init(Configuration conf) {
+    this.localRootLogDir =
+        new File(conf.get(NMConfig.NM_LOG_DIR, NMConfig.DEFAULT_NM_LOG_DIR));
+    this.remoteRootLogDir =
+        new Path(conf.get(NMConfig.REMOTE_USER_LOG_DIR,
+            NMConfig.DEFAULT_REMOTE_APP_LOG_DIR));
+    super.init(conf);
+  }
+
+  @Override
+  public synchronized void start() {
+    String address =
+        getConfig().get(NM_BIND_ADDRESS, DEFAULT_NM_BIND_ADDRESS);
+    String[] splits = address.split(":");
+    this.nodeFile = splits[0] + "_" + splits[1];
+    super.start();
+  }
+
+  Path getRemoteNodeLogFileForApp(ApplicationId appId) {
+    return getRemoteNodeLogFileForApp(this.remoteRootLogDir, appId,
+        this.nodeFile);
+  }
+
+  static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
+      ApplicationId appId, String nodeFile) {
+    return new Path(getRemoteAppLogDir(remoteRootLogDir, appId),
+        nodeFile);
+  }
+
+  static Path getRemoteAppLogDir(Path remoteRootLogDir,
+      ApplicationId appId) {
+    return new Path(remoteRootLogDir, ConverterUtils.toString(appId));
+  }
+
+  File getLocalAppLogDir(ApplicationId appId) {
+    return new File(this.localRootLogDir, ConverterUtils.toString(appId));
+  }
+
+  @Override
+  public synchronized void stop() {
+    LOG.info(this.getName() + " waiting for pending aggregation during exit");
+    for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) {
+      appLogAggregator.join();
+    }
+    super.stop();
+  }
+
+  private void initApp(final ApplicationId appId, String user,
+      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy) {
+
+    // Get user's FileSystem credentials
+    UserGroupInformation userUgi =
+        UserGroupInformation.createRemoteUser(user);
+    if (credentials != null) {
+      for (Token<? extends TokenIdentifier> token : credentials
+          .getAllTokens()) {
+        userUgi.addToken(token);
+      }
+    }
+
+    // New application
+    AppLogAggregator appLogAggregator =
+        new AppLogAggregatorImpl(this.deletionService, getConfig(), appId,
+            userUgi, getLocalAppLogDir(appId),
+            getRemoteNodeLogFileForApp(appId), logRetentionPolicy);
+    if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
+      throw new YarnException("Duplicate initApp for " + appId);
+    }
+
+    // Create the app dir
+    try {
+      userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          // TODO: Reuse FS for user?
+          FileSystem remoteFS = FileSystem.get(getConfig());
+          remoteFS.mkdirs(getRemoteAppLogDir(
+              LogAggregationService.this.remoteRootLogDir, appId)
+              .makeQualified(remoteFS.getUri(),
+                  remoteFS.getWorkingDirectory()));
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      throw new YarnException(e);
+    }
+
+    // Get the user configuration for the list of containers that need log
+    // aggregation.
+
+    // Schedule the aggregator.
+    this.threadPool.execute(appLogAggregator);
+  }
+
+  private void stopContainer(ContainerId containerId, String exitCode) {
+
+    // A container is complete. Put this containers' logs up for aggregation if
+    // this containers' logs are needed.
+
+    if (!this.appLogAggregators.containsKey(containerId.getAppId())) {
+      throw new YarnException("Application is not initialized yet for "
+          + containerId);
+    }
+    this.appLogAggregators.get(containerId.getAppId())
+        .startContainerLogAggregation(containerId, exitCode.equals("0"));
+  }
+
+  private void stopApp(ApplicationId appId) {
+
+    // App is complete. Finish up any containers' pending log aggregation and
+    // close the application specific logFile.
+
+    if (!this.appLogAggregators.containsKey(appId)) {
+      throw new YarnException("Application is not initialized yet for "
+          + appId);
+    }
+    this.appLogAggregators.get(appId).finishLogAggregation();
+  }
+
+  @Override
+  public void handle(LogAggregatorEvent event) {
+    switch (event.getType()) {
+    case APPLICATION_STARTED:
+      LogAggregatorAppStartedEvent appStartEvent =
+          (LogAggregatorAppStartedEvent) event;
+      initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
+          appStartEvent.getCredentials(),
+          appStartEvent.getLogRetentionPolicy());
+      break;
+    case CONTAINER_FINISHED:
+      LogAggregatorContainerFinishedEvent containerFinishEvent =
+          (LogAggregatorContainerFinishedEvent) event;
+      stopContainer(containerFinishEvent.getContainerId(),
+          containerFinishEvent.getExitCode());
+      break;
+    case APPLICATION_FINISHED:
+      LogAggregatorAppFinishedEvent appFinishedEvent =
+          (LogAggregatorAppFinishedEvent) event;
+      stopApp(appFinishedEvent.getApplicationId());
+      break;
+    default:
+      ; // Ignore
+    }
+  }
+}



Mime
View raw message