hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From junping...@apache.org
Subject [1/2] hadoop git commit: YARN-6876. Create an abstract log writer for extendability. Contributed by Xuan Gong.
Date Thu, 24 Aug 2017 22:26:06 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 bf903396a -> e12358c5c


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12358c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 1601c3f..51c63c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -19,11 +19,7 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -38,8 +34,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.Credentials;
@@ -57,7 +51,9 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;
@@ -71,7 +67,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.Times;
 
@@ -86,18 +81,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private static final Logger LOG =
        LoggerFactory.getLogger(AppLogAggregatorImpl.class);
   private static final int THREAD_SLEEP_TIME = 1000;
-  // This is temporary solution. The configuration will be deleted once
-  // we find a more scalable method to only write a single log file per LRS.
-  private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
-      = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
-  private static final int
-      DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
-  
-  // This configuration is for debug and test purpose. By setting
-  // this configuration as true. We can break the lower bound of
-  // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
-  private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
-      = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
 
   private final LocalDirsHandlerService dirsHandler;
   private final Dispatcher dispatcher;
@@ -118,10 +101,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final FileContext lfs;
   private final LogAggregationContext logAggregationContext;
   private final Context context;
-  private final int retentionSize;
-  private final long rollingMonitorInterval;
-  private final boolean logAggregationInRolling;
   private final NodeId nodeId;
+  private final LogAggregationFileControllerContext logControllerContext;
 
   // These variables are only for testing
   private final AtomicBoolean waiting = new AtomicBoolean(false);
@@ -134,6 +115,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       new HashMap<ContainerId, ContainerLogAggregator>();
   private final ContainerLogAggregationPolicy logAggPolicy;
 
+  private final LogAggregationFileController logAggregationFileController;
+
 
   /**
    * The value recovered from state store to determine the age of application
@@ -151,7 +134,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       FileContext lfs, long rollingMonitorInterval) {
     this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
         dirsHandler, remoteNodeLogFileForApp, appAcls,
-        logAggregationContext, context, lfs, rollingMonitorInterval, -1);
+        logAggregationContext, context, lfs, rollingMonitorInterval, -1, null);
   }
 
   public AppLogAggregatorImpl(Dispatcher dispatcher,
@@ -162,6 +145,21 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       LogAggregationContext logAggregationContext, Context context,
       FileContext lfs, long rollingMonitorInterval,
       long recoveredLogInitedTime) {
+    this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
+        dirsHandler, remoteNodeLogFileForApp, appAcls,
+        logAggregationContext, context, lfs, rollingMonitorInterval,
+        recoveredLogInitedTime, null);
+  }
+
+  public AppLogAggregatorImpl(Dispatcher dispatcher,
+      DeletionService deletionService, Configuration conf,
+      ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
+      LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
+      Map<ApplicationAccessType, String> appAcls,
+      LogAggregationContext logAggregationContext, Context context,
+      FileContext lfs, long rollingMonitorInterval,
+      long recoveredLogInitedTime,
+      LogAggregationFileController logAggregationFileController) {
     this.dispatcher = dispatcher;
     this.conf = conf;
     this.delService = deletionService;
@@ -169,31 +167,41 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.applicationId = appId.toString();
     this.userUgi = userUgi;
     this.dirsHandler = dirsHandler;
-    this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
-    this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
     this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
     this.appAcls = appAcls;
     this.lfs = lfs;
     this.logAggregationContext = logAggregationContext;
     this.context = context;
     this.nodeId = nodeId;
-    int configuredRentionSize =
-        conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
-            DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
-    if (configuredRentionSize <= 0) {
-      this.retentionSize =
-          DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
+    this.logAggPolicy = getLogAggPolicy(conf);
+    this.recoveredLogInitedTime = recoveredLogInitedTime;
+    if (logAggregationFileController == null) {
+      // by default, use T-File Controller
+      this.logAggregationFileController = new LogAggregationTFileController();
+      this.logAggregationFileController.initialize(conf, "TFile");
+      this.logAggregationFileController.verifyAndCreateRemoteLogDir();
+      this.logAggregationFileController.createAppDir(
+          this.userUgi.getShortUserName(), appId, userUgi);
+      this.remoteNodeLogFileForApp = this.logAggregationFileController
+          .getRemoteNodeLogFileForApp(appId,
+              this.userUgi.getShortUserName(), nodeId);
+      this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
     } else {
-      this.retentionSize = configuredRentionSize;
+      this.logAggregationFileController = logAggregationFileController;
+      this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
+      this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
     }
-    this.rollingMonitorInterval = rollingMonitorInterval;
-    this.logAggregationInRolling =
-        this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
+    boolean logAggregationInRolling =
+        rollingMonitorInterval <= 0 || this.logAggregationContext == null
             || this.logAggregationContext.getRolledLogsIncludePattern() == null
             || this.logAggregationContext.getRolledLogsIncludePattern()
-              .isEmpty() ? false : true;
-    this.logAggPolicy = getLogAggPolicy(conf);
-    this.recoveredLogInitedTime = recoveredLogInitedTime;
+                .isEmpty() ? false : true;
+    logControllerContext = new LogAggregationFileControllerContext(
+            this.remoteNodeLogFileForApp,
+            this.remoteNodeTmpLogFileForApp,
+            logAggregationInRolling,
+            rollingMonitorInterval,
+            this.appId, this.appAcls, this.nodeId, this.userUgi);
   }
 
   private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
@@ -293,14 +301,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     logAggregationTimes++;
     String diagnosticMessage = "";
     boolean logAggregationSucceedInThisCycle = true;
-    try (LogWriter writer = createLogWriter()) {
+    try {
       try {
-        writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
-            this.userUgi);
-        // Write ACLs once when the writer is created.
-        writer.writeApplicationACLs(appAcls);
-        writer.writeApplicationOwner(this.userUgi.getShortUserName());
-
+        logAggregationFileController.initializeWriter(logControllerContext);
       } catch (IOException e1) {
         logAggregationSucceedInThisCycle = false;
         LOG.error("Cannot create writer for app " + this.applicationId
@@ -318,8 +321,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
           containerLogAggregators.put(container, aggregator);
         }
         Set<Path> uploadedFilePathsInThisCycle =
-            aggregator.doContainerLogAggregation(writer, appFinished,
-            finishedContainers.contains(container));
+            aggregator.doContainerLogAggregation(logAggregationFileController,
+            appFinished, finishedContainers.contains(container));
         if (uploadedFilePathsInThisCycle.size() > 0) {
           uploadedLogsInThisCycle = true;
           List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
@@ -337,60 +340,28 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         }
       }
 
-      // Before upload logs, make sure the number of existing logs
-      // is smaller than the configured NM log aggregation retention size.
-      if (uploadedLogsInThisCycle && logAggregationInRolling) {
-        cleanOldLogs();
-        cleanupOldLogTimes++;
-      }
-
-      long currentTime = System.currentTimeMillis();
-      final Path renamedPath = getRenamedPath(currentTime);
-
-      final boolean rename = uploadedLogsInThisCycle;
+      logControllerContext.setUploadedLogsInThisCycle(uploadedLogsInThisCycle);
+      logControllerContext.setLogUploadTimeStamp(System.currentTimeMillis());
+      logControllerContext.increLogAggregationTimes();
       try {
-        userUgi.doAs(new PrivilegedExceptionAction<Object>() {
-          @Override
-          public Object run() throws Exception {
-            FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
-            if (rename) {
-              remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
-            } else {
-              remoteFS.delete(remoteNodeTmpLogFileForApp, false);
-            }
-            return null;
-          }
-        });
-        diagnosticMessage =
-            "Log uploaded successfully for Application: " + appId
-                + " in NodeManager: "
-                + LogAggregationUtils.getNodeString(nodeId) + " at "
-                + Times.format(currentTime) + "\n";
+        this.logAggregationFileController.postWrite(logControllerContext);
+        diagnosticMessage = "Log uploaded successfully for Application: "
+            + appId + " in NodeManager: "
+            + LogAggregationUtils.getNodeString(nodeId) + " at "
+            + Times.format(logControllerContext.getLogUploadTimeStamp())
+            + "\n";
       } catch (Exception e) {
-        LOG.error(
-          "Failed to move temporary log file to final location: ["
-              + remoteNodeTmpLogFileForApp + "] to ["
-              + renamedPath + "]", e);
-        diagnosticMessage =
-            "Log uploaded failed for Application: " + appId
-                + " in NodeManager: "
-                + LogAggregationUtils.getNodeString(nodeId) + " at "
-                + Times.format(currentTime) + "\n";
+        diagnosticMessage = e.getMessage();
         renameTemporaryLogFileFailed = true;
         logAggregationSucceedInThisCycle = false;
       }
     } finally {
       sendLogAggregationReport(logAggregationSucceedInThisCycle,
           diagnosticMessage, appFinished);
+      logAggregationFileController.closeWriter();
     }
   }
 
-  private Path getRenamedPath(long currentTime) {
-    return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp
-        : new Path(remoteNodeLogFileForApp.getParent(),
-        remoteNodeLogFileForApp.getName() + "_" + currentTime);
-  }
-
   private void addCredentials() {
     if (UserGroupInformation.isSecurityEnabled()) {
       Credentials systemCredentials =
@@ -407,11 +378,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     }
   }
 
-  @VisibleForTesting
-  protected LogWriter createLogWriter() {
-    return new LogWriter();
-  }
-
   private void sendLogAggregationReport(
       boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
       boolean appFinished) {
@@ -442,60 +408,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.context.getLogAggregationStatusForApps().add(report);
   }
 
-  private void cleanOldLogs() {
-    try {
-      final FileSystem remoteFS =
-          this.remoteNodeLogFileForApp.getFileSystem(conf);
-      Path appDir =
-          this.remoteNodeLogFileForApp.getParent().makeQualified(
-            remoteFS.getUri(), remoteFS.getWorkingDirectory());
-      Set<FileStatus> status =
-          new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
-
-      Iterable<FileStatus> mask =
-          Iterables.filter(status, new Predicate<FileStatus>() {
-            @Override
-            public boolean apply(FileStatus next) {
-              return next.getPath().getName()
-                .contains(LogAggregationUtils.getNodeString(nodeId))
-                && !next.getPath().getName().endsWith(
-                    LogAggregationUtils.TMP_FILE_SUFFIX);
-            }
-          });
-      status = Sets.newHashSet(mask);
-      // Normally, we just need to delete one oldest log
-      // before we upload a new log.
-      // If we can not delete the older logs in this cycle,
-      // we will delete them in next cycle.
-      if (status.size() >= this.retentionSize) {
-        // sort by the lastModificationTime ascending
-        List<FileStatus> statusList = new ArrayList<FileStatus>(status);
-        Collections.sort(statusList, new Comparator<FileStatus>() {
-          public int compare(FileStatus s1, FileStatus s2) {
-            return s1.getModificationTime() < s2.getModificationTime() ? -1
-                : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
-          }
-        });
-        for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
-          final FileStatus remove = statusList.get(i);
-          try {
-            userUgi.doAs(new PrivilegedExceptionAction<Object>() {
-              @Override
-              public Object run() throws Exception {
-                remoteFS.delete(remove.getPath(), false);
-                return null;
-              }
-            });
-          } catch (Exception e) {
-            LOG.error("Failed to delete " + remove.getPath(), e);
-          }
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Failed to clean old logs", e);
-    }
-  }
-
   @SuppressWarnings("unchecked")
   @Override
   public void run() {
@@ -523,8 +435,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       synchronized(this) {
         try {
           waiting.set(true);
-          if (logAggregationInRolling) {
-            wait(this.rollingMonitorInterval * 1000);
+          if (logControllerContext.isLogAggregationInRolling()) {
+            wait(logControllerContext.getRollingMonitorInterval() * 1000);
             if (this.appFinishing.get() || this.aborted.get()) {
               break;
             }
@@ -653,7 +565,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
           recoveredLogInitedTime, logRetentionSecs * 1000);
     }
 
-    public Set<Path> doContainerLogAggregation(LogWriter writer,
+    public Set<Path> doContainerLogAggregation(
+        LogAggregationFileController logAggregationFileController,
         boolean appFinished, boolean containerFinished) {
       LOG.info("Uploading logs for container " + containerId
           + ". Current good log dirs are "
@@ -665,7 +578,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
             this.uploadedFileMeta,  retentionContext, appFinished,
             containerFinished);
       try {
-        writer.append(logKey, logValue);
+        logAggregationFileController.write(logKey, logValue);
       } catch (Exception e) {
         LOG.error("Couldn't upload logs for " + containerId
             + ". Skipping this container.", e);
@@ -708,4 +621,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   int getCleanupOldLogTimes() {
     return this.cleanupOldLogTimes;
   }
+
+  @VisibleForTesting
+  public LogAggregationFileController getLogAggregationFileController() {
+    return this.logAggregationFileController;
+  }
+
+  @VisibleForTesting
+  public LogAggregationFileControllerContext
+      getLogAggregationFileControllerContext() {
+    return this.logControllerContext;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12358c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index aafd7d8..1a59e45 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -18,9 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -32,10 +30,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
@@ -48,7 +43,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -79,36 +75,14 @@ public class LogAggregationService extends AbstractService implements
       = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
   private long rollingMonitorInterval;
 
-  /*
-   * Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
-   * Group to which NMOwner belongs> App dirs will be created as 770,
-   * owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
-   * access / modify the files.
-   * <NMGroup> should obviously be a limited access group.
-   */
-  /**
-   * Permissions for the top level directory under which app directories will be
-   * created.
-   */
-  private static final FsPermission TLDIR_PERMISSIONS = FsPermission
-      .createImmutable((short) 01777);
-  /**
-   * Permissions for the Application directory.
-   */
-  private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
-      .createImmutable((short) 0770);
-
   private final Context context;
   private final DeletionService deletionService;
   private final Dispatcher dispatcher;
 
   private LocalDirsHandlerService dirsHandler;
-  Path remoteRootLogDir;
-  String remoteRootLogDirSuffix;
   private NodeId nodeId;
 
   private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
-  private boolean logPermError = true;
 
   @VisibleForTesting
   ExecutorService threadPool;
@@ -125,12 +99,6 @@ public class LogAggregationService extends AbstractService implements
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
-    this.remoteRootLogDir =
-        new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
-    this.remoteRootLogDirSuffix =
-        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
     int threadPoolSize = getAggregatorThreadPoolSize(conf);
     this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
         new ThreadFactoryBuilder()
@@ -218,158 +186,6 @@ public class LogAggregationService extends AbstractService implements
     }
   }
 
-  protected FileSystem getFileSystem(Configuration conf) throws IOException {
-    return this.remoteRootLogDir.getFileSystem(conf);
-  }
-
-  void verifyAndCreateRemoteLogDir(Configuration conf) {
-    // Checking the existence of the TLD
-    FileSystem remoteFS = null;
-    try {
-      remoteFS = getFileSystem(conf);
-    } catch (IOException e) {
-      throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e);
-    }
-    boolean remoteExists = true;
-    try {
-      FsPermission perms =
-          remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
-      if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
-        LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
-            + "] already exist, but with incorrect permissions. "
-            + "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
-            + "]." + " The cluster may have problems with multiple users.");
-        logPermError = false;
-      } else {
-        logPermError = true;
-      }
-    } catch (FileNotFoundException e) {
-      remoteExists = false;
-    } catch (IOException e) {
-      throw new YarnRuntimeException(
-          "Failed to check permissions for dir ["
-              + this.remoteRootLogDir + "]", e);
-    }
-    if (!remoteExists) {
-      LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
-          + "] does not exist. Attempting to create it.");
-      try {
-        Path qualified =
-            this.remoteRootLogDir.makeQualified(remoteFS.getUri(),
-                remoteFS.getWorkingDirectory());
-        remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
-        remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
-
-        UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
-        String primaryGroupName = null;
-        try {
-          primaryGroupName = loginUser.getPrimaryGroupName();
-        } catch (IOException e) {
-          LOG.warn("No primary group found. The remote root log directory" +
-              " will be created with the HDFS superuser being its group " +
-              "owner. JobHistoryServer may be unable to read the directory.");
-        }
-        // set owner on the remote directory only if the primary group exists
-        if (primaryGroupName != null) {
-          remoteFS.setOwner(qualified,
-              loginUser.getShortUserName(), primaryGroupName);
-        }
-      } catch (IOException e) {
-        throw new YarnRuntimeException("Failed to create remoteLogDir ["
-            + this.remoteRootLogDir + "]", e);
-      }
-    }
-  }
-
-  Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
-    return LogAggregationUtils.getRemoteNodeLogFileForApp(
-        this.remoteRootLogDir, appId, user, this.nodeId,
-        this.remoteRootLogDirSuffix);
-  }
-
-  Path getRemoteAppLogDir(ApplicationId appId, String user) {
-    return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, appId,
-        user, this.remoteRootLogDirSuffix);
-  }
-
-  private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
-      throws IOException {
-    FsPermission dirPerm = new FsPermission(fsPerm);
-    fs.mkdirs(path, dirPerm);
-    FsPermission umask = FsPermission.getUMask(fs.getConf());
-    if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
-      fs.setPermission(path, new FsPermission(fsPerm));
-    }
-  }
-
-  private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
-      throws IOException {
-    boolean exists = true;
-    try {
-      FileStatus appDirStatus = fs.getFileStatus(path);
-      if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
-        fs.setPermission(path, APP_DIR_PERMISSIONS);
-      }
-    } catch (FileNotFoundException fnfe) {
-      exists = false;
-    }
-    return exists;
-  }
-
-  protected void createAppDir(final String user, final ApplicationId appId,
-      UserGroupInformation userUgi) {
-    try {
-      userUgi.doAs(new PrivilegedExceptionAction<Object>() {
-        @Override
-        public Object run() throws Exception {
-          try {
-            // TODO: Reuse FS for user?
-            FileSystem remoteFS = getFileSystem(getConfig());
-
-            // Only creating directories if they are missing to avoid
-            // unnecessary load on the filesystem from all of the nodes
-            Path appDir = LogAggregationUtils.getRemoteAppLogDir(
-                LogAggregationService.this.remoteRootLogDir, appId, user,
-                LogAggregationService.this.remoteRootLogDirSuffix);
-            appDir = appDir.makeQualified(remoteFS.getUri(),
-                remoteFS.getWorkingDirectory());
-
-            if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
-              Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
-                  LogAggregationService.this.remoteRootLogDir, user,
-                  LogAggregationService.this.remoteRootLogDirSuffix);
-              suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
-                  remoteFS.getWorkingDirectory());
-
-              if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
-                Path userDir = LogAggregationUtils.getRemoteLogUserDir(
-                    LogAggregationService.this.remoteRootLogDir, user);
-                userDir = userDir.makeQualified(remoteFS.getUri(),
-                    remoteFS.getWorkingDirectory());
-
-                if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
-                  createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
-                }
-
-                createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
-              }
-
-              createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
-            }
-
-          } catch (IOException e) {
-            LOG.error("Failed to setup application log directory for "
-                + appId, e);
-            throw e;
-          }
-          return null;
-        }
-      });
-    } catch (Exception e) {
-      throw new YarnRuntimeException(e);
-    }
-  }
-
   @SuppressWarnings("unchecked")
   private void initApp(final ApplicationId appId, String user,
       Credentials credentials, Map<ApplicationAccessType, String> appAcls,
@@ -377,7 +193,6 @@ public class LogAggregationService extends AbstractService implements
       long recoveredLogInitedTime) {
     ApplicationEvent eventResponse;
     try {
-      verifyAndCreateRemoteLogDir(getConfig());
       initAppAggregator(appId, user, credentials, appAcls,
           logAggregationContext, recoveredLogInitedTime);
       eventResponse = new ApplicationEvent(appId,
@@ -410,14 +225,17 @@ public class LogAggregationService extends AbstractService implements
       userUgi.addCredentials(credentials);
     }
 
+    LogAggregationFileController logAggregationFileController
+        = getLogAggregationFileController(getConfig());
+    logAggregationFileController.verifyAndCreateRemoteLogDir();
     // New application
     final AppLogAggregator appLogAggregator =
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
             getConfig(), appId, userUgi, this.nodeId, dirsHandler,
-            getRemoteNodeLogFileForApp(appId, user),
-            appAcls, logAggregationContext, this.context,
+            logAggregationFileController.getRemoteNodeLogFileForApp(appId,
+            user, nodeId), appAcls, logAggregationContext, this.context,
             getLocalFileContext(getConfig()), this.rollingMonitorInterval,
-            recoveredLogInitedTime);
+            recoveredLogInitedTime, logAggregationFileController);
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
       throw new YarnRuntimeException("Duplicate initApp for " + appId);
     }
@@ -425,7 +243,7 @@ public class LogAggregationService extends AbstractService implements
     YarnRuntimeException appDirException = null;
     try {
       // Create the app dir
-      createAppDir(user, appId, userUgi);
+      logAggregationFileController.createAppDir(user, appId, userUgi);
     } catch (Exception e) {
       appLogAggregator.disableLogAggregation();
       if (!(e instanceof YarnRuntimeException)) {
@@ -570,4 +388,14 @@ public class LogAggregationService extends AbstractService implements
     }
     return threadPoolSize;
   }
+
+  @VisibleForTesting
+  public LogAggregationFileController getLogAggregationFileController(
+      Configuration conf) {
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(conf);
+    LogAggregationFileController logAggregationFileController = factory
+        .getFileControllerForWrite();
+    return logAggregationFileController;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12358c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/.TestLogAggregationService.java.swp
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/.TestLogAggregationService.java.swp b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/.TestLogAggregationService.java.swp
new file mode 100644
index 0000000..e9172b8
Binary files /dev/null and b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/.TestLogAggregationService.java.swp differ

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12358c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index b6d6ab1..45b1771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
@@ -241,8 +241,8 @@ public class TestAppLogAggregatorImpl {
     // verify uploaded files
     ArgumentCaptor<LogValue> logValCaptor =
         ArgumentCaptor.forClass(LogValue.class);
-    verify(appLogAggregator.logWriter).append(any(LogKey.class),
-        logValCaptor.capture());
+    verify(appLogAggregator.getLogAggregationFileController()).write(
+        any(LogKey.class), logValCaptor.capture());
     Set<String> filesUploaded = new HashSet<>();
     LogValue logValue = logValCaptor.getValue();
     for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
@@ -287,11 +287,13 @@ public class TestAppLogAggregatorImpl {
     final Context context = createContext(config);
     final FileContext fakeLfs = mock(FileContext.class);
     final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
-
+    LogAggregationTFileController format = spy(
+        new LogAggregationTFileController());
+    format.initialize(config, "TFile");
     return new AppLogAggregatorInTest(dispatcher, deletionService,
         config, applicationId, ugi, nodeId, dirsService,
         remoteLogDirForApp, appAcls, logAggregationContext,
-        context, fakeLfs, recoveredLogInitedTimeMillis);
+        context, fakeLfs, recoveredLogInitedTimeMillis, format);
   }
 
   /**
@@ -402,7 +404,6 @@ public class TestAppLogAggregatorImpl {
 
     final DeletionService deletionService;
     final ApplicationId applicationId;
-    final LogWriter logWriter;
     final ArgumentCaptor<LogValue> logValue;
 
     public AppLogAggregatorInTest(Dispatcher dispatcher,
@@ -411,19 +412,15 @@ public class TestAppLogAggregatorImpl {
         LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
         Map<ApplicationAccessType, String> appAcls,
         LogAggregationContext logAggregationContext, Context context,
-        FileContext lfs, long recoveredLogInitedTime) throws IOException {
+        FileContext lfs, long recoveredLogInitedTime,
+        LogAggregationTFileController format) throws IOException {
       super(dispatcher, deletionService, conf, appId, ugi, nodeId,
           dirsHandler, remoteNodeLogFileForApp, appAcls,
-          logAggregationContext, context, lfs, -1, recoveredLogInitedTime);
+          logAggregationContext, context, lfs, -1, recoveredLogInitedTime,
+          format);
       this.applicationId = appId;
       this.deletionService = deletionService;
-      this.logWriter = spy(new LogWriter());
       this.logValue = ArgumentCaptor.forClass(LogValue.class);
     }
-
-    @Override
-    protected LogWriter createLogWriter() {
-      return this.logWriter;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e12358c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 6193f81..cf94176 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -103,6 +103,9 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@@ -161,11 +164,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
   DrainDispatcher dispatcher;
   EventHandler<ApplicationEvent> appEventHandler;
 
+  private NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
+
   @Override
   @SuppressWarnings("unchecked")
   public void setup() throws IOException {
     super.setup();
-    NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
     ((NMContext)context).setNodeId(nodeId);
     dispatcher = createDispatcher();
     appEventHandler = mock(EventHandler.class);
@@ -246,9 +250,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
       app1LogDir.exists());
 
-    Path logFilePath =
-        logAggregationService.getRemoteNodeLogFileForApp(application1,
-            this.user);
+    Path logFilePath = logAggregationService
+        .getLogAggregationFileController(conf)
+        .getRemoteNodeLogFileForApp(application1, this.user, nodeId);
 
     Assert.assertTrue("Log file [" + logFilePath + "] not found", new File(
         logFilePath.toUri().getPath()).exists());
@@ -369,9 +373,10 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
-
-    Assert.assertFalse(new File(logAggregationService
-        .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
+    LogAggregationFileController format1 =
+        logAggregationService.getLogAggregationFileController(conf);
+    Assert.assertFalse(new File(format1.getRemoteNodeLogFileForApp(
+        application1, this.user, this.nodeId).toUri().getPath())
         .exists());
 
     dispatcher.await();
@@ -541,26 +546,33 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     };
     checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
   }
-  
+
   @Test
   public void testVerifyAndCreateRemoteDirsFailure()
       throws Exception {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
-    
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(conf);
+    LogAggregationFileController logAggregationFileFormat = factory
+        .getFileControllerForWrite();
+    final LogAggregationFileController spyLogAggregationFileFormat =
+        spy(logAggregationFileFormat);
+    YarnRuntimeException e = new YarnRuntimeException("KABOOM!");
+    doThrow(e).doNothing().when(spyLogAggregationFileFormat)
+        .verifyAndCreateRemoteLogDir();
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
-                                  super.dirsHandler));
+            super.dirsHandler) {
+        @Override
+        public LogAggregationFileController getLogAggregationFileController(
+            Configuration conf) {
+          return spyLogAggregationFileFormat;
+        }
+      });
     logAggregationService.init(this.conf);
-    
-    YarnRuntimeException e = new YarnRuntimeException("KABOOM!");
-    doThrow(e)
-      .when(logAggregationService).verifyAndCreateRemoteLogDir(
-          any(Configuration.class));
-        
     logAggregationService.start();
-    
     // Now try to start an application
     ApplicationId appId =
         BuilderUtils.newApplicationId(System.currentTimeMillis(),
@@ -607,8 +619,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     
     logAggregationService.stop();
   }
-  
-  
+
   @Test
   public void testVerifyAndCreateRemoteDirNonExistence()
       throws Exception {
@@ -621,14 +632,24 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         new LogAggregationService(dispatcher, this.context, this.delSrvc,
                                   super.dirsHandler));
     logAggregationService.init(this.conf);
+    logAggregationService.start();
     boolean existsBefore = aNewFile.exists();
     assertTrue("The new file already exists!", !existsBefore);
 
-    logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
-    
+    ApplicationId appId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+        this.user, null, this.acls, contextWithAMAndFailed));
+    dispatcher.await();
+
     boolean existsAfter = aNewFile.exists();
     assertTrue("The new aggregate file is not successfully created", existsAfter);
     aNewFile.delete(); //housekeeping
+    logAggregationService.stop();
   }
 
   @Test
@@ -641,7 +662,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     LogAggregationService logAggregationService = new LogAggregationService(
         dispatcher, this.context, this.delSrvc, super.dirsHandler);
     logAggregationService.init(this.conf);
-    logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
+    logAggregationService.start();
+
+    ApplicationId appId = ApplicationId.newInstance(
+        System.currentTimeMillis(), 1);
+    LogAggregationContext contextWithAMAndFailed =
+        Records.newRecord(LogAggregationContext.class);
+    contextWithAMAndFailed.setLogAggregationPolicyClassName(
+        AMOrFailedContainerLogAggregationPolicy.class.getName());
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+        this.user, null, this.acls, contextWithAMAndFailed));
+    dispatcher.await();
 
     String targetGroup =
         UserGroupInformation.getLoginUser().getPrimaryGroupName();
@@ -651,6 +682,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         fileStatus.getGroup(), targetGroup);
 
     fs.delete(aNewFile, true);
+    logAggregationService.stop();
   }
 
   @Test
@@ -669,14 +701,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     FileSystem fs = FileSystem.get(this.conf);
     final FileSystem spyFs = spy(FileSystem.get(this.conf));
 
+    final LogAggregationTFileController spyFileFormat
+        = new LogAggregationTFileController() {
+          @Override
+          public FileSystem getFileSystem(Configuration conf)
+              throws IOException {
+            return spyFs;
+          }
+        };
+    spyFileFormat.initialize(conf, "TFile");
     LogAggregationService aggSvc = new LogAggregationService(dispatcher,
         this.context, this.delSrvc, super.dirsHandler) {
       @Override
-      protected FileSystem getFileSystem(Configuration conf) {
-        return spyFs;
+      public LogAggregationFileController getLogAggregationFileController(
+          Configuration conf) {
+        return spyFileFormat;
       }
     };
-
     aggSvc.init(this.conf);
     aggSvc.start();
 
@@ -769,18 +810,36 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
   @Test
   public void testLogAggregationCreateDirsFailsWithoutKillingNM()
       throws Exception {
-    
-    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS,
+        localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
 
     DeletionService spyDelSrvc = spy(this.delSrvc);
+    LogAggregationFileControllerFactory factory
+        = new LogAggregationFileControllerFactory(conf);
+    LogAggregationFileController logAggregationFileFormat = factory
+        .getFileControllerForWrite();
+    final LogAggregationFileController spyLogAggregationFileFormat =
+        spy(logAggregationFileFormat);
+    Exception e = new RuntimeException("KABOOM!");
+    doThrow(e).when(spyLogAggregationFileFormat)
+        .createAppDir(any(String.class), any(ApplicationId.class),
+            any(UserGroupInformation.class));
     LogAggregationService logAggregationService = spy(
         new LogAggregationService(dispatcher, this.context, spyDelSrvc,
-                                  super.dirsHandler));
+            super.dirsHandler){
+        @Override
+        public LogAggregationFileController getLogAggregationFileController(
+            Configuration conf) {
+          return spyLogAggregationFileFormat;
+        }
+      });
+
     logAggregationService.init(this.conf);
     logAggregationService.start();
-    
+
     ApplicationId appId =
         BuilderUtils.newApplicationId(System.currentTimeMillis(),
           (int) (Math.random() * 1000));
@@ -789,10 +848,6 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         new File(localLogDir, appId.toString());
     appLogDir.mkdir();
 
-    Exception e = new RuntimeException("KABOOM!");
-    doThrow(e)
-      .when(logAggregationService).createAppDir(any(String.class),
-          any(ApplicationId.class), any(UserGroupInformation.class));
     LogAggregationContext contextWithAMAndFailed =
         Records.newRecord(LogAggregationContext.class);
     contextWithAMAndFailed.setLogAggregationPolicyClassName(
@@ -867,7 +922,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
       int minNumOfContainers, int maxNumOfContainers,
       String[] logFiles, int numOfLogsPerContainer, boolean multiLogs)
     throws IOException {
-    Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
+    Path appLogDir = logAggregationService.getLogAggregationFileController(
+        conf).getRemoteAppLogDir(appId, this.user);
     RemoteIterator<FileStatus> nodeFiles = null;
     try {
       Path qualifiedLogDir =
@@ -2108,7 +2164,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     }
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
         this.user, null, this.acls, logAggContext));
-
+    dispatcher.await();
     return logAggregationService;
   }
 
@@ -2462,17 +2518,20 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     logAggregationService.stop();
 
     assertEquals(expectedLogAggregationTimes,
-        aggregator.getLogAggregationTimes());
+        aggregator.getLogAggregationFileControllerContext()
+        .getLogAggregationTimes());
     assertEquals(expectedAggregationReportNum,
         this.context.getLogAggregationStatusForApps().size());
     assertEquals(expectedCleanupOldLogsTimes,
-        aggregator.getCleanupOldLogTimes());
+        aggregator.getLogAggregationFileControllerContext()
+        .getCleanOldLogsTimes());
   }
 
   private int numOfLogsAvailable(LogAggregationService logAggregationService,
       ApplicationId appId, boolean sizeLimited, String lastLogFile)
       throws IOException {
-    Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
+    Path appLogDir = logAggregationService.getLogAggregationFileController(
+        conf).getRemoteAppLogDir(appId, this.user);
     RemoteIterator<FileStatus> nodeFiles = null;
     try {
       Path qualifiedLogDir =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message