hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1208644 [2/3] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce...
Date Wed, 30 Nov 2011 18:27:20 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Nov 30 18:27:04 2011
@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
@@ -120,7 +121,8 @@ public class ContainerManagerImpl extend
   private ContainerTokenSecretManager containerTokenSecretManager;
 
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  
+
+  protected LocalDirsHandlerService dirsHandler;
   protected final AsyncDispatcher dispatcher;
   private final ApplicationACLsManager aclsManager;
 
@@ -129,9 +131,12 @@ public class ContainerManagerImpl extend
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
       NodeManagerMetrics metrics, ContainerTokenSecretManager 
-      containerTokenSecretManager, ApplicationACLsManager aclsManager) {
+      containerTokenSecretManager, ApplicationACLsManager aclsManager,
+      LocalDirsHandlerService dirsHandler) {
     super(ContainerManagerImpl.class.getName());
     this.context = context;
+    this.dirsHandler = dirsHandler;
+
     dispatcher = new AsyncDispatcher();
     this.deletionService = deletionContext;
     this.metrics = metrics;
@@ -190,9 +195,10 @@ public class ContainerManagerImpl extend
     if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
         YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
       return new LogAggregationService(this.dispatcher, context,
-          deletionService);
+          deletionService, dirsHandler);
     } else {
-      return new NonAggregatingLogHandler(this.dispatcher, deletionService);
+      return new NonAggregatingLogHandler(this.dispatcher, deletionService,
+                                          dirsHandler);
     }
   }
 
@@ -203,12 +209,12 @@ public class ContainerManagerImpl extend
   protected ResourceLocalizationService createResourceLocalizationService(
       ContainerExecutor exec, DeletionService deletionContext) {
     return new ResourceLocalizationService(this.dispatcher, exec,
-        deletionContext);
+        deletionContext, dirsHandler);
   }
 
   protected ContainersLauncher createContainersLauncher(Context context,
       ContainerExecutor exec) {
-    return new ContainersLauncher(context, this.dispatcher, exec);
+    return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler);
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerExitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerExitEvent.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerExitEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerExitEvent.java Wed Nov 30 18:27:04 2011
@@ -22,14 +22,20 @@ import org.apache.hadoop.yarn.api.record
 
 public class ContainerExitEvent extends ContainerEvent {
   private int exitCode;
+  private final String diagnosticInfo;
 
   public ContainerExitEvent(ContainerId cID, ContainerEventType eventType,
-      int exitCode) {
+      int exitCode, String diagnosticInfo) {
     super(cID, eventType);
     this.exitCode = exitCode;
+    this.diagnosticInfo = diagnosticInfo;
   }
 
   public int getExitCode() {
     return this.exitCode;
   }
+
+  public String getDiagnosticInfo() {
+    return diagnosticInfo;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Wed Nov 30 18:27:04 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.DelayedProcessKiller;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
@@ -78,7 +79,6 @@ public class ContainerLaunch implements 
   private final Application app;
   private final Container container;
   private final Configuration conf;
-  private final LocalDirAllocator logDirsSelector;
   
   private volatile AtomicBoolean shouldLaunchContainer = new AtomicBoolean(false);
   private volatile AtomicBoolean completed = new AtomicBoolean(false);
@@ -88,14 +88,17 @@ public class ContainerLaunch implements 
 
   private Path pidFilePath = null;
 
+  private final LocalDirsHandlerService dirsHandler;
+
   public ContainerLaunch(Configuration configuration, Dispatcher dispatcher,
-      ContainerExecutor exec, Application app, Container container) {
+      ContainerExecutor exec, Application app, Container container,
+      LocalDirsHandlerService dirsHandler) {
     this.conf = configuration;
     this.app = app;
     this.exec = exec;
     this.container = container;
     this.dispatcher = dispatcher;
-    this.logDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
+    this.dirsHandler = dirsHandler;
     this.sleepDelayBeforeSigKill =
         conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
             YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS);
@@ -121,9 +124,8 @@ public class ContainerLaunch implements 
       List<String> newCmds = new ArrayList<String>(command.size());
       String appIdStr = app.getAppId().toString();
       Path containerLogDir =
-          this.logDirsSelector.getLocalPathForWrite(ContainerLaunch
-              .getRelativeContainerLogDir(appIdStr, containerIdStr),
-              LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
+          dirsHandler.getLogPathForWrite(ContainerLaunch
+              .getRelativeContainerLogDir(appIdStr, containerIdStr), false);
       for (String str : command) {
         // TODO: Should we instead work via symlinks without this grammar?
         newCmds.add(str.replace(ApplicationConstants.LOG_DIR_EXPANSION_VAR,
@@ -144,47 +146,49 @@ public class ContainerLaunch implements 
       // /////////////////////////// End of variable expansion
 
       FileContext lfs = FileContext.getLocalFSFileContext();
-      LocalDirAllocator lDirAllocator =
-          new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS); // TODO
 
       Path nmPrivateContainerScriptPath =
-          lDirAllocator.getLocalPathForWrite(
+          dirsHandler.getLocalPathForWrite(
               getContainerPrivateDir(appIdStr, containerIdStr) + Path.SEPARATOR
-                  + CONTAINER_SCRIPT, this.conf);
+                  + CONTAINER_SCRIPT);
       Path nmPrivateTokensPath =
-          lDirAllocator.getLocalPathForWrite(
+          dirsHandler.getLocalPathForWrite(
               getContainerPrivateDir(appIdStr, containerIdStr)
                   + Path.SEPARATOR
                   + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
-                      containerIdStr), this.conf);
+                      containerIdStr));
 
       DataOutputStream containerScriptOutStream = null;
       DataOutputStream tokensOutStream = null;
 
       // Select the working directory for the container
       Path containerWorkDir =
-          lDirAllocator.getLocalPathForWrite(ContainerLocalizer.USERCACHE
+          dirsHandler.getLocalPathForWrite(ContainerLocalizer.USERCACHE
               + Path.SEPARATOR + user + Path.SEPARATOR
               + ContainerLocalizer.APPCACHE + Path.SEPARATOR + appIdStr
               + Path.SEPARATOR + containerIdStr,
-              LocalDirAllocator.SIZE_UNKNOWN, this.conf, false);
+              LocalDirAllocator.SIZE_UNKNOWN, false);
 
       String pidFileSuffix = String.format(ContainerLaunch.PID_FILE_NAME_FMT,
           containerIdStr);
 
       // pid file should be in nm private dir so that it is not 
       // accessible by users
-      pidFilePath = lDirAllocator.getLocalPathForWrite(
+      pidFilePath = dirsHandler.getLocalPathForWrite(
           ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR 
-          + pidFileSuffix,
-          this.conf);
+          + pidFileSuffix);
+      List<String> localDirs = dirsHandler.getLocalDirs();
+      List<String> logDirs = dirsHandler.getLogDirs();
+
+      if (!dirsHandler.areDisksHealthy()) {
+        ret = ExitCode.DISKS_FAILED.getExitCode();
+        throw new IOException("Most of the disks failed. "
+            + dirsHandler.getDisksHealthReport());
+      }
 
       try {
         // /////////// Write out the container-script in the nmPrivate space.
-        String[] localDirs =
-            this.conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS,
-                YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
-        List<Path> appDirs = new ArrayList<Path>(localDirs.length);
+        List<Path> appDirs = new ArrayList<Path>(localDirs.size());
         for (String localDir : localDirs) {
           Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
           Path userdir = new Path(usersdir, user);
@@ -234,30 +238,34 @@ public class ContainerLaunch implements 
       }
       else {
         exec.activateContainer(containerID, pidFilePath);
-        ret =
-            exec.launchContainer(container, nmPrivateContainerScriptPath,
-                nmPrivateTokensPath, user, appIdStr, containerWorkDir);
+        ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
+                nmPrivateTokensPath, user, appIdStr, containerWorkDir,
+                localDirs, logDirs);
       }
     } catch (Throwable e) {
-      LOG.warn("Failed to launch container", e);
+      LOG.warn("Failed to launch container.", e);
       dispatcher.getEventHandler().handle(new ContainerExitEvent(
             launchContext.getContainerId(),
-            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
+            e.getMessage()));
       return ret;
     } finally {
       completed.set(true);
       exec.deactivateContainer(containerID);
     }
 
-    LOG.debug("Container " + containerIdStr + " completed with exit code "
-        + ret);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Container " + containerIdStr + " completed with exit code "
+                + ret);
+    }
     if (ret == ExitCode.FORCE_KILLED.getExitCode()
         || ret == ExitCode.TERMINATED.getExitCode()) {
       // If the process was killed, Send container_cleanedup_after_kill and
       // just break out of this method.
       dispatcher.getEventHandler().handle(
             new ContainerExitEvent(launchContext.getContainerId(),
-                ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret));
+                ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ret,
+                "Container exited with a non-zero exit code " + ret));
       return ret;
     }
 
@@ -265,7 +273,8 @@ public class ContainerLaunch implements 
       LOG.warn("Container exited with a non-zero exit code " + ret);
       this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
               launchContext.getContainerId(),
-              ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret));
+              ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ret,
+              "Container exited with a non-zero exit code " + ret));
       return ret;
     }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Wed Nov 30 18:27:04 2011
@@ -33,10 +33,10 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -59,6 +59,8 @@ public class ContainersLauncher extends 
   private final Context context;
   private final ContainerExecutor exec;
   private final Dispatcher dispatcher;
+
+  private LocalDirsHandlerService dirsHandler;
   private final ExecutorService containerLauncher =
     Executors.newCachedThreadPool(
         new ThreadFactoryBuilder()
@@ -80,11 +82,12 @@ public class ContainersLauncher extends 
 
 
   public ContainersLauncher(Context context, Dispatcher dispatcher,
-      ContainerExecutor exec) {
+      ContainerExecutor exec, LocalDirsHandlerService dirsHandler) {
     super("containers-launcher");
     this.exec = exec;
     this.context = context;
     this.dispatcher = dispatcher;
+    this.dirsHandler = dirsHandler;
   }
 
   @Override
@@ -114,15 +117,19 @@ public class ContainersLauncher extends 
         Application app =
           context.getApplications().get(
               containerId.getApplicationAttemptId().getApplicationId());
-      ContainerLaunch launch =
-          new ContainerLaunch(getConfig(), dispatcher, exec, app,
-              event.getContainer());
+
+        ContainerLaunch launch = new ContainerLaunch(getConfig(), dispatcher,
+            exec, app, event.getContainer(), dirsHandler);
         running.put(containerId,
             new RunningContainer(containerLauncher.submit(launch), 
                 launch));
         break;
       case CLEANUP_CONTAINER:
         RunningContainer rContainerDatum = running.remove(containerId);
+        if (rContainerDatum == null) {
+          // Container not launched. So nothing needs to be done.
+          return;
+        }
         Future<Integer> rContainer = rContainerDatum.runningcontainer;
         if (rContainer != null 
             && !rContainer.isDone()) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java Wed Nov 30 18:27:04 2011
@@ -45,12 +45,10 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -61,7 +59,6 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.ResourceStatusType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerSecurityInfo;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenIdentifier;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.security.LocalizerTokenSecretManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -186,16 +183,30 @@ public class ContainerLocalizer {
   }
 
   Callable<Path> download(LocalDirAllocator lda, LocalResource rsrc,
-      UserGroupInformation ugi) {
-    return new FSDownload(lfs, ugi, conf, lda, rsrc, new Random());
+      UserGroupInformation ugi) throws IOException {
+    Path destPath = lda.getLocalPathForWrite(".", getEstimatedSize(rsrc), conf);
+    return new FSDownload(lfs, ugi, conf, destPath, rsrc, new Random());
+  }
+
+  static long getEstimatedSize(LocalResource rsrc) {
+    if (rsrc.getSize() < 0) {
+      return -1;
+    }
+    switch (rsrc.getType()) {
+      case ARCHIVE:
+        return 5 * rsrc.getSize();
+      case FILE:
+      default:
+        return rsrc.getSize();
+    }
   }
 
   void sleep(int duration) throws InterruptedException {
     TimeUnit.SECONDS.sleep(duration);
   }
 
-  private void localizeFiles(LocalizationProtocol nodemanager, ExecutorService exec,
-      UserGroupInformation ugi) {
+  private void localizeFiles(LocalizationProtocol nodemanager,
+      ExecutorService exec, UserGroupInformation ugi) throws IOException {
     while (true) {
       try {
         LocalizerStatus status = createStatus();

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java Wed Nov 30 18:27:04 2011
@@ -57,7 +57,6 @@ import static org.apache.hadoop.fs.Creat
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -68,7 +67,6 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.net.NetUtils;
@@ -81,6 +79,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.LocalizationProtocol;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@@ -125,19 +124,18 @@ public class ResourceLocalizationService
   private InetSocketAddress localizationServerAddress;
   private long cacheTargetSize;
   private long cacheCleanupPeriod;
-  private List<Path> logDirs;
-  private List<Path> localDirs;
-  private List<Path> sysDirs;
+
   private final ContainerExecutor exec;
   protected final Dispatcher dispatcher;
   private final DeletionService delService;
   private LocalizerTracker localizerTracker;
   private RecordFactory recordFactory;
-  private final LocalDirAllocator localDirsSelector;
   private final ScheduledExecutorService cacheCleanup;
 
   private final LocalResourcesTracker publicRsrc;
-  
+
+  private LocalDirsHandlerService dirsHandler;
+
   /**
    * Map of LocalResourceTrackers keyed by username, for private
    * resources.
@@ -153,12 +151,15 @@ public class ResourceLocalizationService
     new ConcurrentHashMap<String,LocalResourcesTracker>();
 
   public ResourceLocalizationService(Dispatcher dispatcher,
-      ContainerExecutor exec, DeletionService delService) {
+      ContainerExecutor exec, DeletionService delService,
+      LocalDirsHandlerService dirsHandler) {
+
     super(ResourceLocalizationService.class.getName());
     this.exec = exec;
     this.dispatcher = dispatcher;
     this.delService = delService;
-    this.localDirsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
+    this.dirsHandler = dirsHandler;
+
     this.publicRsrc = new LocalResourcesTrackerImpl(null, dispatcher);
     this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
         new ThreadFactoryBuilder()
@@ -177,41 +178,31 @@ public class ResourceLocalizationService
   @Override
   public void init(Configuration conf) {
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
+
     try {
       // TODO queue deletions here, rather than NM init?
       FileContext lfs = getLocalFileContext(conf);
-      String[] sLocalDirs =
-        conf.getStrings(YarnConfiguration.NM_LOCAL_DIRS, YarnConfiguration.DEFAULT_NM_LOCAL_DIRS);
-
-      localDirs = new ArrayList<Path>(sLocalDirs.length);
-      logDirs = new ArrayList<Path>(sLocalDirs.length);
-      sysDirs = new ArrayList<Path>(sLocalDirs.length);
-      for (String sLocaldir : sLocalDirs) {
-        Path localdir = new Path(sLocaldir);
-        localDirs.add(localdir);
+      List<String> localDirs = dirsHandler.getLocalDirs();
+      for (String localDir : localDirs) {
         // $local/usercache
-        Path userdir = new Path(localdir, ContainerLocalizer.USERCACHE);
-        lfs.mkdir(userdir, null, true);
+        Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
+        lfs.mkdir(userDir, null, true);
         // $local/filecache
-        Path filedir = new Path(localdir, ContainerLocalizer.FILECACHE);
-        lfs.mkdir(filedir, null, true);
+        Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
+        lfs.mkdir(fileDir, null, true);
         // $local/nmPrivate
-        Path sysdir = new Path(localdir, NM_PRIVATE_DIR);
-        lfs.mkdir(sysdir, NM_PRIVATE_PERM, true);
-        sysDirs.add(sysdir);
-      }
-      String[] sLogdirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS, YarnConfiguration.DEFAULT_NM_LOG_DIRS);
-      for (String sLogdir : sLogdirs) {
-        Path logdir = new Path(sLogdir);
-        logDirs.add(logdir);
-        lfs.mkdir(logdir, null, true);
+        Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
+        lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
+      }
+
+      List<String> logDirs = dirsHandler.getLogDirs();
+      for (String logDir : logDirs) {
+        lfs.mkdir(new Path(logDir), null, true);
       }
     } catch (IOException e) {
       throw new YarnException("Failed to initialize LocalizationService", e);
     }
-    localDirs = Collections.unmodifiableList(localDirs);
-    logDirs = Collections.unmodifiableList(logDirs);
-    sysDirs = Collections.unmodifiableList(sysDirs);
+
     cacheTargetSize =
       conf.getLong(YarnConfiguration.NM_LOCALIZER_CACHE_TARGET_SIZE_MB, YarnConfiguration.DEFAULT_NM_LOCALIZER_CACHE_TARGET_SIZE_MB) << 20;
     cacheCleanupPeriod =
@@ -391,7 +382,7 @@ public class ResourceLocalizationService
     String containerIDStr = c.toString();
     String appIDStr = ConverterUtils.toString(
         c.getContainerID().getApplicationAttemptId().getApplicationId());
-    for (Path localDir : localDirs) {
+    for (String localDir : dirsHandler.getLocalDirs()) {
 
       // Delete the user-owned container-dir
       Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
@@ -428,7 +419,7 @@ public class ResourceLocalizationService
     // Delete the application directories
     userName = application.getUser();
     appIDStr = application.toString();
-    for (Path localDir : localDirs) {
+    for (String localDir : dirsHandler.getLocalDirs()) {
 
       // Delete the user-owned app-dir
       Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
@@ -574,12 +565,9 @@ public class ResourceLocalizationService
 
   class PublicLocalizer extends Thread {
 
-    static final String PUBCACHE_CTXT = "public.cache.dirs";
-
     final FileContext lfs;
     final Configuration conf;
     final ExecutorService threadPool;
-    final LocalDirAllocator publicDirs;
     final CompletionService<Path> queue;
     final Map<Future<Path>,LocalizerResourceRequestEvent> pending;
     // TODO hack to work around broken signaling
@@ -601,13 +589,23 @@ public class ResourceLocalizationService
       this.conf = conf;
       this.pending = pending;
       this.attempts = attempts;
-      String[] publicFilecache = new String[localDirs.size()];
-      for (int i = 0, n = localDirs.size(); i < n; ++i) {
-        publicFilecache[i] =
-          new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
-      }
-      conf.setStrings(PUBCACHE_CTXT, publicFilecache);
-      this.publicDirs = new LocalDirAllocator(PUBCACHE_CTXT);
+//      List<String> localDirs = dirsHandler.getLocalDirs();
+//      String[] publicFilecache = new String[localDirs.size()];
+//      for (int i = 0, n = localDirs.size(); i < n; ++i) {
+//        publicFilecache[i] =
+//          new Path(localDirs.get(i), ContainerLocalizer.FILECACHE).toString();
+//      }
+//      conf.setStrings(PUBCACHE_CTXT, publicFilecache);
+
+//      this.publicDirDestPath = new LocalDirAllocator(PUBCACHE_CTXT).getLocalPathForWrite(pathStr, conf);
+//      List<String> localDirs = dirsHandler.getLocalDirs();
+//      String[] publicFilecache = new String[localDirs.size()];
+//      int i = 0;
+//      for (String localDir : localDirs) {
+//        publicFilecache[i++] =
+//            new Path(localDir, ContainerLocalizer.FILECACHE).toString();
+//      }
+
       this.threadPool = threadPool;
       this.queue = new ExecutorCompletionService<Path>(threadPool);
     }
@@ -619,11 +617,19 @@ public class ResourceLocalizationService
       synchronized (attempts) {
         List<LocalizerResourceRequestEvent> sigh = attempts.get(key);
         if (null == sigh) {
-          pending.put(queue.submit(new FSDownload(
-                  lfs, null, conf, publicDirs,
-                  request.getResource().getRequest(), new Random())),
-              request);
-          attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
+          LocalResource resource = request.getResource().getRequest();
+          try {
+            Path publicDirDestPath = dirsHandler.getLocalPathForWrite(
+                "." + Path.SEPARATOR + ContainerLocalizer.FILECACHE,
+                ContainerLocalizer.getEstimatedSize(resource), true);
+            pending.put(queue.submit(new FSDownload(
+                lfs, null, conf, publicDirDestPath, resource, new Random())),
+                request);
+            attempts.put(key, new LinkedList<LocalizerResourceRequestEvent>());
+          } catch (IOException e) {
+            LOG.error("Local path for public localization is not found. "
+                + " May be disks failed.", e);
+          }
         } else {
           sigh.add(request);
         }
@@ -844,24 +850,30 @@ public class ResourceLocalizationService
     public void run() {
       Path nmPrivateCTokensPath = null;
       try {
-        // Use LocalDirAllocator to get nmPrivateDir
+        // Get nmPrivateDir
         nmPrivateCTokensPath =
-            localDirsSelector.getLocalPathForWrite(
-                NM_PRIVATE_DIR
-                    + Path.SEPARATOR
+          dirsHandler.getLocalPathForWrite(
+                NM_PRIVATE_DIR + Path.SEPARATOR
                     + String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
-                        localizerId), getConfig());
+                        localizerId));
 
         // 0) init queue, etc.
         // 1) write credentials to private dir
         writeCredentials(nmPrivateCTokensPath);
         // 2) exec initApplication and wait
-        exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
-            context.getUser(),
-            ConverterUtils.toString(
-                context.getContainerId().
-                    getApplicationAttemptId().getApplicationId()),
-            localizerId, localDirs);
+        List<String> localDirs = dirsHandler.getLocalDirs();
+        List<String> logDirs = dirsHandler.getLogDirs();
+        if (dirsHandler.areDisksHealthy()) {
+          exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
+              context.getUser(),
+              ConverterUtils.toString(
+                  context.getContainerId().
+                  getApplicationAttemptId().getApplicationId()),
+              localizerId, localDirs, logDirs);
+        } else {
+          throw new IOException("All disks failed. "
+              + dirsHandler.getDisksHealthReport());
+        }
       // TODO handle ExitCodeException separately?
       } catch (Exception e) {
         LOG.info("Localizer failed", e);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Wed Nov 30 18:27:04 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -31,6 +32,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -40,10 +42,12 @@ import org.apache.hadoop.yarn.logaggrega
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+
 public class AppLogAggregatorImpl implements AppLogAggregator {
 
   private static final Log LOG = LogFactory
@@ -51,6 +55,7 @@ public class AppLogAggregatorImpl implem
   private static final int THREAD_SLEEP_TIME = 1000;
   private static final String TMP_FILE_SUFFIX = ".tmp";
 
+  private final LocalDirsHandlerService dirsHandler;
   private final Dispatcher dispatcher;
   private final ApplicationId appId;
   private final String applicationId;
@@ -58,7 +63,6 @@ public class AppLogAggregatorImpl implem
   private final Configuration conf;
   private final DeletionService delService;
   private final UserGroupInformation userUgi;
-  private final String[] rootLogDirs;
   private final Path remoteNodeLogFileForApp;
   private final Path remoteNodeTmpLogFileForApp;
   private final ContainerLogsRetentionPolicy retentionPolicy;
@@ -72,7 +76,7 @@ public class AppLogAggregatorImpl implem
 
   public AppLogAggregatorImpl(Dispatcher dispatcher,
       DeletionService deletionService, Configuration conf, ApplicationId appId,
-      UserGroupInformation userUgi, String[] localRootLogDirs,
+      UserGroupInformation userUgi, LocalDirsHandlerService dirsHandler,
       Path remoteNodeLogFileForApp,
       ContainerLogsRetentionPolicy retentionPolicy,
       Map<ApplicationAccessType, String> appAcls) {
@@ -82,7 +86,7 @@ public class AppLogAggregatorImpl implem
     this.appId = appId;
     this.applicationId = ConverterUtils.toString(appId);
     this.userUgi = userUgi;
-    this.rootLogDirs = localRootLogDirs;
+    this.dirsHandler = dirsHandler;
     this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
     this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
     this.retentionPolicy = retentionPolicy;
@@ -115,9 +119,11 @@ public class AppLogAggregatorImpl implem
       }
     }
 
-    LOG.info("Uploading logs for container " + containerId);
+    LOG.info("Uploading logs for container " + containerId
+        + ". Current good log dirs are "
+        + StringUtils.join(",", dirsHandler.getLogDirs()));
     LogKey logKey = new LogKey(containerId);
-    LogValue logValue = new LogValue(this.rootLogDirs, containerId);
+    LogValue logValue = new LogValue(dirsHandler.getLogDirs(), containerId);
     try {
       this.writer.append(logKey, logValue);
     } catch (IOException e) {
@@ -150,9 +156,10 @@ public class AppLogAggregatorImpl implem
     }
 
     // Remove the local app-log-dirs
-    Path[] localAppLogDirs = new Path[this.rootLogDirs.length];
+    List<String> rootLogDirs = dirsHandler.getLogDirs();
+    Path[] localAppLogDirs = new Path[rootLogDirs.size()];
     int index = 0;
-    for (String rootLogDir : this.rootLogDirs) {
+    for (String rootLogDir : rootLogDirs) {
       localAppLogDirs[index] = new Path(rootLogDir, this.applicationId);
       index++;
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Wed Nov 30 18:27:04 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.logaggrega
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@@ -85,7 +86,7 @@ public class LogAggregationService exten
   private final DeletionService deletionService;
   private final Dispatcher dispatcher;
 
-  private String[] localRootLogDirs;
+  private LocalDirsHandlerService dirsHandler;
   Path remoteRootLogDir;
   String remoteRootLogDirSuffix;
   private NodeId nodeId;
@@ -95,11 +96,12 @@ public class LogAggregationService exten
   private final ExecutorService threadPool;
 
   public LogAggregationService(Dispatcher dispatcher, Context context,
-      DeletionService deletionService) {
+      DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
     super(LogAggregationService.class.getName());
     this.dispatcher = dispatcher;
     this.context = context;
     this.deletionService = deletionService;
+    this.dirsHandler = dirsHandler;
     this.appLogAggregators =
         new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
     this.threadPool = Executors.newCachedThreadPool(
@@ -109,9 +111,6 @@ public class LogAggregationService exten
   }
 
   public synchronized void init(Configuration conf) {
-    this.localRootLogDirs =
-        conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
-            YarnConfiguration.DEFAULT_NM_LOG_DIRS);
     this.remoteRootLogDir =
         new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@@ -291,9 +290,10 @@ public class LogAggregationService exten
 
     // New application
     AppLogAggregator appLogAggregator =
-        new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId,
-            userUgi, this.localRootLogDirs, 
-            getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, appAcls);
+        new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
+            getConfig(), appId, userUgi, dirsHandler,
+            getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
+            appAcls);
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
       throw new YarnException("Duplicate initApp for " + appId);
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java Wed Nov 30 18:27:04 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
@@ -53,15 +55,16 @@ public class NonAggregatingLogHandler ex
   private final DeletionService delService;
   private final Map<ApplicationId, String> appOwners;
 
-  private String[] rootLogDirs;
+  private final LocalDirsHandlerService dirsHandler;
   private long deleteDelaySeconds;
   private ScheduledThreadPoolExecutor sched;
 
   public NonAggregatingLogHandler(Dispatcher dispatcher,
-      DeletionService delService) {
+      DeletionService delService, LocalDirsHandlerService dirsHandler) {
     super(NonAggregatingLogHandler.class.getName());
     this.dispatcher = dispatcher;
     this.delService = delService;
+    this.dirsHandler = dirsHandler;
     this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
   }
 
@@ -70,9 +73,6 @@ public class NonAggregatingLogHandler ex
     // Default 3 hours.
     this.deleteDelaySeconds =
         conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 3 * 60 * 60);
-    this.rootLogDirs =
-        conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
-            YarnConfiguration.DEFAULT_NM_LOG_DIRS);
     sched = createScheduledThreadPoolExecutor(conf);
     super.init(conf);
   }
@@ -145,10 +145,11 @@ public class NonAggregatingLogHandler ex
     @Override
     @SuppressWarnings("unchecked")
     public void run() {
-      Path[] localAppLogDirs =
-          new Path[NonAggregatingLogHandler.this.rootLogDirs.length];
+      List<String> rootLogDirs =
+          NonAggregatingLogHandler.this.dirsHandler.getLogDirs();
+      Path[] localAppLogDirs = new Path[rootLogDirs.size()];
       int index = 0;
-      for (String rootLogDir : NonAggregatingLogHandler.this.rootLogDirs) {
+      for (String rootLogDir : rootLogDirs) {
         localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
         index++;
       }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerLogsPage.java Wed Nov 30 18:27:04 2011
@@ -34,15 +34,14 @@ import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
@@ -87,17 +86,18 @@ public class ContainerLogsPage extends N
   public static class ContainersLogsBlock extends HtmlBlock implements
       YarnWebParams {    
     private final Configuration conf;
-    private final LocalDirAllocator logsSelector;
     private final Context nmContext;
     private final ApplicationACLsManager aclsManager;
+    private final LocalDirsHandlerService dirsHandler;
 
     @Inject
     public ContainersLogsBlock(Configuration conf, Context context,
-        ApplicationACLsManager aclsManager) {
+        ApplicationACLsManager aclsManager,
+        LocalDirsHandlerService dirsHandler) {
       this.conf = conf;
-      this.logsSelector = new LocalDirAllocator(YarnConfiguration.NM_LOG_DIRS);
       this.nmContext = context;
       this.aclsManager = aclsManager;
+      this.dirsHandler = dirsHandler;
     }
 
     @Override
@@ -198,11 +198,10 @@ public class ContainerLogsPage extends N
         File logFile = null;
         try {
           logFile =
-              new File(this.logsSelector
-                  .getLocalPathToRead(
-                      ContainerLaunch.getRelativeContainerLogDir(
-                          applicationId.toString(), containerId.toString())
-                          + Path.SEPARATOR + $(CONTAINER_LOG_TYPE), this.conf)
+              new File(this.dirsHandler.getLogPathToRead(
+                  ContainerLaunch.getRelativeContainerLogDir(
+                  applicationId.toString(), containerId.toString())
+                  + Path.SEPARATOR + $(CONTAINER_LOG_TYPE))
                   .toUri().getPath());
         } catch (Exception e) {
           html.h1("Cannot find this log on the local disk.");
@@ -272,8 +271,8 @@ public class ContainerLogsPage extends N
         }
       } else {
         // Just print out the log-types
-        List<File> containerLogsDirs =
-            getContainerLogDirs(this.conf, containerId);
+        List<File> containerLogsDirs = getContainerLogDirs(containerId,
+            dirsHandler);
         boolean foundLogFile = false;
         for (File containerLogsDir : containerLogsDirs) {
           for (File logFile : containerLogsDir.listFiles()) {
@@ -293,11 +292,10 @@ public class ContainerLogsPage extends N
       return;
     }
 
-    static List<File>
-        getContainerLogDirs(Configuration conf, ContainerId containerId) {
-      String[] logDirs = conf.getStrings(YarnConfiguration.NM_LOG_DIRS,
-          YarnConfiguration.DEFAULT_NM_LOG_DIRS);
-      List<File> containerLogDirs = new ArrayList<File>(logDirs.length);
+    static List<File> getContainerLogDirs(ContainerId containerId,
+            LocalDirsHandlerService dirsHandler) {
+      List<String> logDirs = dirsHandler.getLogDirs();
+      List<File> containerLogDirs = new ArrayList<File>(logDirs.size());
       for (String logDir : logDirs) {
         String appIdStr = 
             ConverterUtils.toString(

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java Wed Nov 30 18:27:04 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -42,10 +43,11 @@ public class WebServer extends AbstractS
   private WebApp webApp;
 
   public WebServer(Context nmContext, ResourceView resourceView,
-      ApplicationACLsManager aclsManager) {
+      ApplicationACLsManager aclsManager,
+      LocalDirsHandlerService dirsHandler) {
     super(WebServer.class.getName());
     this.nmContext = nmContext;
-    this.nmWebApp = new NMWebApp(resourceView, aclsManager);
+    this.nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
   }
 
   @Override
@@ -81,17 +83,21 @@ public class WebServer extends AbstractS
 
     private final ResourceView resourceView;
     private final ApplicationACLsManager aclsManager;
+    private final LocalDirsHandlerService dirsHandler;
 
     public NMWebApp(ResourceView resourceView,
-        ApplicationACLsManager aclsManager) {
+        ApplicationACLsManager aclsManager,
+        LocalDirsHandlerService dirsHandler) {
       this.resourceView = resourceView;
       this.aclsManager = aclsManager;
+      this.dirsHandler = dirsHandler;
     }
 
     @Override
     public void setup() {
       bind(ResourceView.class).toInstance(this.resourceView);
       bind(ApplicationACLsManager.class).toInstance(this.aclsManager);
+      bind(LocalDirsHandlerService.class).toInstance(dirsHandler);
       route("/", NMController.class, "info");
       route("/node", NMController.class, "node");
       route("/allApplications", NMController.class, "allApplications");

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.c Wed Nov 30 18:27:04 2011
@@ -261,8 +261,15 @@ char * get_value(const char* key) {
  * Value delimiter is assumed to be a comma.
  */
 char ** get_values(const char * key) {
-  char ** toPass = NULL;
   char *value = get_value(key);
+  return extract_values(value);
+}
+
+/**
+ * Extracts array of values from the comma separated list of values.
+ */
+char ** extract_values(char *value) {
+  char ** toPass = NULL;
   char *tempTok = NULL;
   char *tempstr = NULL;
   int size = 0;
@@ -276,8 +283,7 @@ char ** get_values(const char * key) {
       toPass[size++] = tempTok;
       if(size == toPassSize) {
         toPassSize += MAX_SIZE;
-        toPass = (char **) realloc(toPass,(sizeof(char *) *
-                                           (MAX_SIZE * toPassSize)));
+        toPass = (char **) realloc(toPass,(sizeof(char *) * toPassSize));
       }
       tempTok = strtok_r(NULL, ",", &tempstr);
     }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.h?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.h (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/configuration.h Wed Nov 30 18:27:04 2011
@@ -34,6 +34,9 @@ char *get_value(const char* key);
 //comma seperated strings.
 char ** get_values(const char* key);
 
+// Extracts array of values from the comma separated list of values.
+char ** extract_values(char *value);
+
 // free the memory returned by get_values
 void free_values(char** values);
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.c Wed Nov 30 18:27:04 2011
@@ -357,7 +357,7 @@ int mkdirs(const char* path, mode_t perm
  * It creates the container work and log directories.
  */
 static int create_container_directories(const char* user, const char *app_id, 
-					const char *container_id) {
+    const char *container_id, char* const* local_dir, char* const* log_dir) {
   // create dirs as 0750
   const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
   if (app_id == NULL || container_id == NULL || user == NULL) {
@@ -367,20 +367,11 @@ static int create_container_directories(
   }
 
   int result = -1;
-
-  char **local_dir = get_values(NM_SYS_DIR_KEY);
-
-  if (local_dir == NULL) {
-    fprintf(LOGFILE, "%s is not configured.\n", NM_SYS_DIR_KEY);
-    return -1;
-  }
-
-  char **local_dir_ptr;
+  char* const* local_dir_ptr;
   for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
     char *container_dir = get_container_work_directory(*local_dir_ptr, user, app_id, 
                                                 container_id);
     if (container_dir == NULL) {
-      free_values(local_dir);
       return -1;
     }
     if (mkdirs(container_dir, perms) == 0) {
@@ -390,7 +381,6 @@ static int create_container_directories(
     free(container_dir);
 
   }
-  free_values(local_dir);
   if (result != 0) {
     return result;
   }
@@ -404,19 +394,11 @@ static int create_container_directories(
   } else {
     sprintf(combined_name, "%s/%s", app_id, container_id);
 
-    char **log_dir = get_values(NM_LOG_DIR_KEY);
-    if (log_dir == NULL) {
-      free(combined_name);
-      fprintf(LOGFILE, "%s is not configured.\n", NM_LOG_DIR_KEY);
-      return -1;
-    }
-
-    char **log_dir_ptr;
+    char* const* log_dir_ptr;
     for(log_dir_ptr = log_dir; *log_dir_ptr != NULL; ++log_dir_ptr) {
       char *container_log_dir = get_app_log_directory(*log_dir_ptr, combined_name);
       if (container_log_dir == NULL) {
         free(combined_name);
-        free_values(log_dir);
         return -1;
       } else if (mkdirs(container_log_dir, perms) != 0) {
     	free(container_log_dir);
@@ -426,7 +408,6 @@ static int create_container_directories(
       }
     }
     free(combined_name);
-    free_values(log_dir);
   }
   return result;
 }
@@ -660,17 +641,12 @@ static int copy_file(int input, const ch
 /**
  * Function to initialize the user directories of a user.
  */
-int initialize_user(const char *user) {
-  char **local_dir = get_values(NM_SYS_DIR_KEY);
-  if (local_dir == NULL) {
-    fprintf(LOGFILE, "%s is not configured.\n", NM_SYS_DIR_KEY);
-    return INVALID_NM_ROOT_DIRS;
-  }
+int initialize_user(const char *user, char* const* local_dirs) {
 
   char *user_dir;
-  char **local_dir_ptr = local_dir;
+  char* const* local_dir_ptr;
   int failed = 0;
-  for(local_dir_ptr = local_dir; *local_dir_ptr != 0; ++local_dir_ptr) {
+  for(local_dir_ptr = local_dirs; *local_dir_ptr != 0; ++local_dir_ptr) {
     user_dir = get_user_directory(*local_dir_ptr, user);
     if (user_dir == NULL) {
       fprintf(LOGFILE, "Couldn't get userdir directory for %s.\n", user);
@@ -682,32 +658,29 @@ int initialize_user(const char *user) {
     }
     free(user_dir);
   }
-  free_values(local_dir);
   return failed ? INITIALIZE_USER_FAILED : 0;
 }
 
 /**
  * Function to prepare the application directories for the container.
  */
-int initialize_app(const char *user, const char *app_id, 
-		   const char* nmPrivate_credentials_file, char* const* args) {
+int initialize_app(const char *user, const char *app_id,
+                   const char* nmPrivate_credentials_file,
+                   char* const* local_dirs, char* const* log_roots,
+                   char* const* args) {
   if (app_id == NULL || user == NULL) {
     fprintf(LOGFILE, "Either app_id is null or the user passed is null.\n");
     return INVALID_ARGUMENT_NUMBER;
   }
 
   // create the user directory on all disks
-  int result = initialize_user(user);
+  int result = initialize_user(user, local_dirs);
   if (result != 0) {
     return result;
   }
 
   ////////////// create the log directories for the app on all disks
-  char **log_roots = get_values(NM_LOG_DIR_KEY);
-  if (log_roots == NULL) {
-    return INVALID_CONFIG_FILE;
-  }
-  char **log_root;
+  char* const* log_root;
   char *any_one_app_log_dir = NULL;
   for(log_root=log_roots; *log_root != NULL; ++log_root) {
     char *app_log_dir = get_app_log_directory(*log_root, app_id);
@@ -722,7 +695,7 @@ int initialize_app(const char *user, con
       free(app_log_dir);
     }
   }
-  free_values(log_roots);
+
   if (any_one_app_log_dir == NULL) {
     fprintf(LOGFILE, "Did not create any app-log directories\n");
     return -1;
@@ -743,15 +716,9 @@ int initialize_app(const char *user, con
 
   // 750
   mode_t permissions = S_IRWXU | S_IRGRP | S_IXGRP;
-  char **nm_roots = get_values(NM_SYS_DIR_KEY);
-
-  if (nm_roots == NULL) {
-    return INVALID_CONFIG_FILE;
-  }
-
-  char **nm_root;
+  char* const* nm_root;
   char *primary_app_dir = NULL;
-  for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
+  for(nm_root=local_dirs; *nm_root != NULL; ++nm_root) {
     char *app_dir = get_app_directory(*nm_root, user, app_id);
     if (app_dir == NULL) {
       // try the next one
@@ -763,7 +730,7 @@ int initialize_app(const char *user, con
       free(app_dir);
     }
   }
-  free_values(nm_roots);
+
   if (primary_app_dir == NULL) {
     fprintf(LOGFILE, "Did not create any app directories\n");
     return -1;
@@ -805,9 +772,10 @@ int initialize_app(const char *user, con
 }
 
 int launch_container_as_user(const char *user, const char *app_id, 
-                     const char *container_id, const char *work_dir,
-                     const char *script_name, const char *cred_file,
-                     const char* pid_file) {
+                   const char *container_id, const char *work_dir,
+                   const char *script_name, const char *cred_file,
+                   const char* pid_file, char* const* local_dirs,
+                   char* const* log_dirs) {
   int exit_code = -1;
   char *script_file_dest = NULL;
   char *cred_file_dest = NULL;
@@ -854,7 +822,8 @@ int launch_container_as_user(const char 
     goto cleanup;
   }
 
-  if (create_container_directories(user, app_id, container_id) != 0) {
+  if (create_container_directories(user, app_id, container_id, local_dirs,
+                                   log_dirs) != 0) {
     fprintf(LOGFILE, "Could not create container dirs");
     goto cleanup;
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/container-executor.h Wed Nov 30 18:27:04 2011
@@ -61,8 +61,6 @@ enum errorcodes {
 #define NM_APP_DIR_PATTERN USER_DIR_PATTERN "/appcache/%s"
 #define CONTAINER_DIR_PATTERN NM_APP_DIR_PATTERN "/%s"
 #define CONTAINER_SCRIPT "launch_container.sh"
-#define NM_SYS_DIR_KEY "yarn.nodemanager.local-dirs"
-#define NM_LOG_DIR_KEY "yarn.nodemanager.log-dirs"
 #define CREDENTIALS_FILENAME "container_tokens"
 #define MIN_USERID_KEY "min.user.id"
 #define BANNED_USERS_KEY "banned.users"
@@ -92,12 +90,13 @@ int check_executor_permissions(char *exe
 
 // initialize the application directory
 int initialize_app(const char *user, const char *app_id,
-                   const char *credentials, char* const* args);
+                   const char *credentials, char* const* local_dirs,
+                   char* const* log_dirs, char* const* args);
 
 /*
  * Function used to launch a container as the provided user. It does the following :
  * 1) Creates container work dir and log dir to be accessible by the child
- * 2) Copies the script file from the TT to the work directory
+ * 2) Copies the script file from the NM to the work directory
  * 3) Sets up the environment
  * 4) Does an execlp on the same in order to replace the current image with
  *    container image.
@@ -109,12 +108,15 @@ int initialize_app(const char *user, con
  * @param cred_file the credentials file that needs to be compied to the
  * working directory.
  * @param pid_file file where pid of process should be written to
+ * @param local_dirs nodemanager-local-directories to be used
+ * @param log_dirs nodemanager-log-directories to be used
  * @return -1 or errorcode enum value on error (should never return on success).
  */
 int launch_container_as_user(const char * user, const char *app_id,
                      const char *container_id, const char *work_dir,
                      const char *script_name, const char *cred_file,
-                     const char *pid_file);
+                     const char *pid_file, char* const* local_dirs,
+                     char* const* log_dirs);
 
 /**
  * Function used to signal a container launched by the user.
@@ -181,7 +183,7 @@ int mkdirs(const char* path, mode_t perm
 /**
  * Function to initialize the user directories of a user.
  */
-int initialize_user(const char *user);
+int initialize_user(const char *user, char* const* local_dirs);
 
 /**
  * Create a top level directory for the user.

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c Wed Nov 30 18:27:04 2011
@@ -43,10 +43,11 @@ void display_usage(FILE *stream) {
   fprintf(stream,
       "Usage: container-executor user command command-args\n");
   fprintf(stream, "Commands:\n");
-  fprintf(stream, "   initialize container: %2d appid tokens cmd app...\n",
-	  INITIALIZE_CONTAINER);
+  fprintf(stream, "   initialize container: %2d appid tokens " \
+   "nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
   fprintf(stream,
-      "   launch container:    %2d appid containerid workdir container-script tokens pidfile\n",
+      "   launch container:    %2d appid containerid workdir "\
+      "container-script tokens pidfile nm-local-dirs nm-log-dirs\n",
 	  LAUNCH_CONTAINER);
   fprintf(stream, "   signal container:    %2d container-pid signal\n",
 	  SIGNAL_CONTAINER);
@@ -96,6 +97,7 @@ int main(int argc, char **argv) {
 
   char *orig_conf_file = STRINGIFY(HADOOP_CONF_DIR) "/" CONF_FILENAME;
   char *conf_file = realpath(orig_conf_file, NULL);
+  char *local_dirs, *log_dirs;
 
   if (conf_file == NULL) {
     fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
@@ -158,20 +160,23 @@ int main(int argc, char **argv) {
 
   switch (command) {
   case INITIALIZE_CONTAINER:
-    if (argc < 6) {
-      fprintf(ERRORFILE, "Too few arguments (%d vs 6) for initialize container\n",
+    if (argc < 8) {
+      fprintf(ERRORFILE, "Too few arguments (%d vs 8) for initialize container\n",
 	      argc);
       fflush(ERRORFILE);
       return INVALID_ARGUMENT_NUMBER;
     }
     app_id = argv[optind++];
     cred_file = argv[optind++];
+    local_dirs = argv[optind++];// good local dirs as a comma separated list
+    log_dirs = argv[optind++];// good log dirs as a comma separated list
     exit_code = initialize_app(user_detail->pw_name, app_id, cred_file,
-                               argv + optind);
+                               extract_values(local_dirs),
+                               extract_values(log_dirs), argv + optind);
     break;
   case LAUNCH_CONTAINER:
-    if (argc < 9) {
-      fprintf(ERRORFILE, "Too few arguments (%d vs 9) for launch container\n",
+    if (argc != 11) {
+      fprintf(ERRORFILE, "Too few arguments (%d vs 11) for launch container\n",
 	      argc);
       fflush(ERRORFILE);
       return INVALID_ARGUMENT_NUMBER;
@@ -182,13 +187,17 @@ int main(int argc, char **argv) {
     script_file = argv[optind++];
     cred_file = argv[optind++];
     pid_file = argv[optind++];
-    exit_code = launch_container_as_user(user_detail->pw_name, app_id, container_id,
-                                 current_dir, script_file, cred_file, pid_file);
+    local_dirs = argv[optind++];// good local dirs as a comma separated list
+    log_dirs = argv[optind++];// good log dirs as a comma separated list
+    exit_code = launch_container_as_user(user_detail->pw_name, app_id,
+                    container_id, current_dir, script_file, cred_file,
+                    pid_file, extract_values(local_dirs),
+                    extract_values(log_dirs));
     break;
   case SIGNAL_CONTAINER:
-    if (argc < 5) {
-      fprintf(ERRORFILE, "Too few arguments (%d vs 5) for signal container\n",
-	      argc);
+    if (argc != 5) {
+      fprintf(ERRORFILE, "Wrong number of arguments (%d vs 5) for " \
+          "signal container\n", argc);
       fflush(ERRORFILE);
       return INVALID_ARGUMENT_NUMBER;
     } else {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/test/test-container-executor.c Wed Nov 30 18:27:04 2011
@@ -28,10 +28,17 @@
 #include <sys/stat.h>
 #include <sys/wait.h>
 
-#define TEST_ROOT "/tmp/test-container-controller"
+#define TEST_ROOT "/tmp/test-container-executor"
 #define DONT_TOUCH_FILE "dont-touch-me"
+#define NM_LOCAL_DIRS       TEST_ROOT "/local-1," TEST_ROOT "/local-2," \
+               TEST_ROOT "/local-3," TEST_ROOT "/local-4," TEST_ROOT "/local-5"
+#define NM_LOG_DIRS         TEST_ROOT "/logdir_1," TEST_ROOT "/logdir_2," \
+                            TEST_ROOT "/logdir_3," TEST_ROOT "/logdir_4"
+#define ARRAY_SIZE 1000
 
 static char* username = NULL;
+static char* local_dirs = NULL;
+static char* log_dirs = NULL;
 
 /**
  * Run the command using the effective user id.
@@ -84,40 +91,33 @@ void run(const char *cmd) {
 
 int write_config_file(char *file_name) {
   FILE *file;
-  int i = 0;
   file = fopen(file_name, "w");
   if (file == NULL) {
     printf("Failed to open %s.\n", file_name);
     return EXIT_FAILURE;
   }
-  fprintf(file, "yarn.nodemanager.local-dirs=" TEST_ROOT "/local-1");
-  for(i=2; i < 5; ++i) {
-    fprintf(file, "," TEST_ROOT "/local-%d", i);
-  }
-  fprintf(file, "\n");
-  fprintf(file, "yarn.nodemanager.log-dirs=" TEST_ROOT "/logs\n");
+  fprintf(file, "banned.users=bannedUser\n");
+  fprintf(file, "min.user.id=1000\n");
   fclose(file);
   return 0;
 }
 
-void create_nm_roots() {
-  char** nm_roots = get_values(NM_SYS_DIR_KEY);
+void create_nm_roots(char ** nm_roots) {
   char** nm_root;
   for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
     if (mkdir(*nm_root, 0755) != 0) {
       printf("FAIL: Can't create directory %s - %s\n", *nm_root,
-	     strerror(errno));
+             strerror(errno));
       exit(1);
     }
     char buffer[100000];
     sprintf(buffer, "%s/usercache", *nm_root);
     if (mkdir(buffer, 0755) != 0) {
       printf("FAIL: Can't create directory %s - %s\n", buffer,
-	     strerror(errno));
+             strerror(errno));
       exit(1);
     }
   }
-  free_values(nm_roots);
 }
 
 void test_get_user_directory() {
@@ -209,7 +209,7 @@ void test_check_configuration_permission
 }
 
 void test_delete_container() {
-  if (initialize_user(username)) {
+  if (initialize_user(username, extract_values(local_dirs))) {
     printf("FAIL: failed to initialize user %s\n", username);
     exit(1);
   }
@@ -504,7 +504,8 @@ void test_init_app() {
     exit(1);
   } else if (child == 0) {
     char *final_pgm[] = {"touch", "my-touch-file", 0};
-    if (initialize_app(username, "app_4", TEST_ROOT "/creds.txt", final_pgm) != 0) {
+    if (initialize_app(username, "app_4", TEST_ROOT "/creds.txt", final_pgm,
+        extract_values(local_dirs), extract_values(log_dirs)) != 0) {
       printf("FAIL: failed in child\n");
       exit(42);
     }
@@ -598,7 +599,8 @@ void test_run_container() {
     exit(1);
   } else if (child == 0) {
     if (launch_container_as_user(username, "app_4", "container_1", 
-                         container_dir, script_name, TEST_ROOT "/creds.txt", pid_file) != 0) {
+          container_dir, script_name, TEST_ROOT "/creds.txt", pid_file,
+          extract_values(local_dirs), extract_values(log_dirs)) != 0) {
       printf("FAIL: failed in child\n");
       exit(42);
     }
@@ -677,7 +679,12 @@ int main(int argc, char **argv) {
   }
   read_config(TEST_ROOT "/test.cfg");
 
-  create_nm_roots();
+  local_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
+  strcpy(local_dirs, NM_LOCAL_DIRS);
+  log_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
+  strcpy(log_dirs, NM_LOG_DIRS);
+
+  create_nm_roots(extract_values(local_dirs));
 
   if (getuid() == 0 && argc == 2) {
     username = argv[1];

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java Wed Nov 30 18:27:04 2011
@@ -60,16 +60,18 @@ public class DummyContainerManager exten
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
       NodeManagerMetrics metrics,
       ContainerTokenSecretManager containerTokenSecretManager,
-      ApplicationACLsManager applicationACLsManager) {
+      ApplicationACLsManager applicationACLsManager,
+      LocalDirsHandlerService dirsHandler) {
     super(context, exec, deletionContext, nodeStatusUpdater, metrics,
-        containerTokenSecretManager, applicationACLsManager);
+        containerTokenSecretManager, applicationACLsManager, dirsHandler);
   }
 
   @Override
   @SuppressWarnings("unchecked")
-  protected ResourceLocalizationService createResourceLocalizationService(ContainerExecutor exec,
-      DeletionService deletionContext) {
-    return new ResourceLocalizationService(super.dispatcher, exec, deletionContext) {
+  protected ResourceLocalizationService createResourceLocalizationService(
+      ContainerExecutor exec, DeletionService deletionContext) {
+    return new ResourceLocalizationService(super.dispatcher, exec,
+        deletionContext, super.dirsHandler) {
       @Override
       public void handle(LocalizationEvent event) {
         switch (event.getType()) {
@@ -125,7 +127,8 @@ public class DummyContainerManager exten
   @SuppressWarnings("unchecked")
   protected ContainersLauncher createContainersLauncher(Context context,
       ContainerExecutor exec) {
-    return new ContainersLauncher(context, super.dispatcher, exec) {
+    return new ContainersLauncher(context, super.dispatcher, exec,
+                                  super.dirsHandler) {
       @Override
       public void handle(ContainersLauncherEvent event) {
         Container container = event.getContainer();
@@ -139,7 +142,8 @@ public class DummyContainerManager exten
         case CLEANUP_CONTAINER:
           dispatcher.getEventHandler().handle(
               new ContainerExitEvent(containerId,
-                  ContainerEventType.CONTAINER_KILLED_ON_REQUEST, 0));
+                  ContainerEventType.CONTAINER_KILLED_ON_REQUEST, 0,
+                  "Container exited with exit code 0."));
           break;
         }
       }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Wed Nov 30 18:27:04 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.no
 import java.io.File;
 import java.io.IOException;
 
-import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -80,9 +79,12 @@ public class TestEventFlow {
 
     ContainerExecutor exec = new DefaultContainerExecutor();
     exec.setConf(conf);
+
     DeletionService del = new DeletionService(exec);
     Dispatcher dispatcher = new AsyncDispatcher();
-    NodeHealthCheckerService healthChecker = null;
+    NodeHealthCheckerService healthChecker = new NodeHealthCheckerService();
+    healthChecker.init(conf);
+    LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
     NodeManagerMetrics metrics = NodeManagerMetrics.create();
     ContainerTokenSecretManager containerTokenSecretManager =  new ContainerTokenSecretManager();
     NodeStatusUpdater nodeStatusUpdater =
@@ -100,7 +102,8 @@ public class TestEventFlow {
 
     DummyContainerManager containerManager = new DummyContainerManager(
         context, exec, del, nodeStatusUpdater, metrics,
-        containerTokenSecretManager, new ApplicationACLsManager(conf));
+        containerTokenSecretManager, new ApplicationACLsManager(conf),
+        dirsHandler);
     containerManager.init(conf);
     containerManager.start();
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java?rev=1208644&r1=1208643&r2=1208644&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLinuxContainerExecutor.java Wed Nov 30 18:27:04 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -63,8 +64,6 @@ import org.junit.Test;
  * config values.
  * <br><pre><code>
  * > cat /etc/hadoop/container-executor.cfg
- * yarn.nodemanager.local-dirs=/tmp/hadoop/nm-local/
- * yarn.nodemanager.log-dirs=/tmp/hadoop/nm-log
  * yarn.nodemanager.linux-container-executor.group=mapred
  * #depending on the user id of the application.submitter option
  * min.user.id=1
@@ -72,7 +71,7 @@ import org.junit.Test;
  * > sudo chmod 444 /etc/hadoop/container-executor.cfg
  * </code></pre>
  * 
- * <li>iMove the binary and set proper permissions on it. It needs to be owned 
+ * <li>Move the binary and set proper permissions on it. It needs to be owned 
  * by root, the group needs to be the group configured in container-executor.cfg, 
  * and it needs the setuid bit set. (The build will also overwrite it so you
  * need to move it to a place that you can support it. 
@@ -98,14 +97,22 @@ public class TestLinuxContainerExecutor 
   
   private LinuxContainerExecutor exec = null;
   private String appSubmitter = null;
+  private LocalDirsHandlerService dirsHandler;
 
   @Before
   public void setup() throws Exception {
-    FileContext.getLocalFSFileContext().mkdir(
-        new Path(workSpace.getAbsolutePath()), null, true);
+    FileContext files = FileContext.getLocalFSFileContext();
+    Path workSpacePath = new Path(workSpace.getAbsolutePath());
+    files.mkdir(workSpacePath, null, true);
     workSpace.setReadable(true, false);
     workSpace.setExecutable(true, false);
     workSpace.setWritable(true, false);
+    File localDir = new File(workSpace.getAbsoluteFile(), "localDir");
+    files.mkdir(new Path(localDir.getAbsolutePath()),
+        new FsPermission("777"), false);
+    File logDir = new File(workSpace.getAbsoluteFile(), "logDir");
+    files.mkdir(new Path(logDir.getAbsolutePath()),
+        new FsPermission("777"), false);
     String exec_path = System.getProperty("container-executor.path");
     if(exec_path != null && !exec_path.isEmpty()) {
       Configuration conf = new Configuration(false);
@@ -114,6 +121,10 @@ public class TestLinuxContainerExecutor 
       conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
       exec = new LinuxContainerExecutor();
       exec.setConf(conf);
+      conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
+      conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
+      dirsHandler = new LocalDirsHandlerService();
+      dirsHandler.init(conf);
     }
     appSubmitter = System.getProperty("application.submitter");
     if(appSubmitter == null || appSubmitter.isEmpty()) {
@@ -189,7 +200,8 @@ public class TestLinuxContainerExecutor 
 
     exec.activateContainer(cId, pidFile);
     return exec.launchContainer(container, scriptPath, tokensPath,
-        appSubmitter, appId, workDir);
+        appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
+        dirsHandler.getLogDirs());
   }
   
   



Mime
View raw message