hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From junping...@apache.org
Subject hadoop git commit: YARN-5237. Fix missing log files issue in rolling log aggregation. Contributed by Xuan Gong.
Date Thu, 16 Jun 2016 14:15:10 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 ea2e7321d -> 810470508


YARN-5237. Fix missing log files issue in rolling log aggregation. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81047050
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81047050
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81047050

Branch: refs/heads/branch-2.8
Commit: 810470508bacfddcbb54241e27046168edca2f48
Parents: ea2e732
Author: Junping Du <junping_du@apache.org>
Authored: Thu Jun 16 07:18:36 2016 -0700
Committer: Junping Du <junping_du@apache.org>
Committed: Thu Jun 16 07:18:36 2016 -0700

----------------------------------------------------------------------
 .../logaggregation/AggregatedLogFormat.java     |  47 ++++++---
 .../logaggregation/AppLogAggregatorImpl.java    |   9 +-
 .../TestLogAggregationService.java              | 100 ++++++++++++++++++-
 3 files changed, 134 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81047050/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index f5dbc92..5051731 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -168,17 +168,20 @@ public class AggregatedLogFormat {
     private final Set<String> alreadyUploadedLogFiles;
     private Set<String> allExistingFileMeta = new HashSet<String>();
     private final boolean appFinished;
+    private final boolean containerFinished;
     // TODO Maybe add a version string here. Instead of changing the version of
     // the entire k-v format
 
     public LogValue(List<String> rootLogDirs, ContainerId containerId,
         String user) {
-      this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
+      this(rootLogDirs, containerId, user, null, new HashSet<String>(), true,
+          true);
     }
 
     public LogValue(List<String> rootLogDirs, ContainerId containerId,
         String user, LogAggregationContext logAggregationContext,
-        Set<String> alreadyUploadedLogFiles, boolean appFinished) {
+        Set<String> alreadyUploadedLogFiles, boolean appFinished,
+        boolean containerFinished) {
       this.rootLogDirs = new ArrayList<String>(rootLogDirs);
       this.containerId = containerId;
       this.user = user;
@@ -188,6 +191,7 @@ public class AggregatedLogFormat {
       this.logAggregationContext = logAggregationContext;
       this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
       this.appFinished = appFinished;
+      this.containerFinished = containerFinished;
     }
 
     private Set<File> getPendingLogFilesToUploadForThisContainer() {
@@ -294,28 +298,39 @@ public class AggregatedLogFormat {
         this.allExistingFileMeta.add(getLogFileMetaData(logFile));
       }
 
+      Set<File> fileCandidates = new HashSet<File>(candidates);
       if (this.logAggregationContext != null && candidates.size() > 0) {
-        filterFiles(
-          this.appFinished ? this.logAggregationContext.getIncludePattern()
+        fileCandidates = getFileCandidates(fileCandidates, this.appFinished);
+        if (!this.appFinished && this.containerFinished) {
+          Set<File> addition = new HashSet<File>(candidates);
+          addition = getFileCandidates(addition, true);
+          fileCandidates.addAll(addition);
+        }
+      }
+      return fileCandidates;
+    }
+
+    private Set<File> getFileCandidates(Set<File> candidates,
+        boolean useRegularPattern) {
+      filterFiles(
+          useRegularPattern ? this.logAggregationContext.getIncludePattern()
               : this.logAggregationContext.getRolledLogsIncludePattern(),
           candidates, false);
 
-        filterFiles(
-          this.appFinished ? this.logAggregationContext.getExcludePattern()
+      filterFiles(
+          useRegularPattern ? this.logAggregationContext.getExcludePattern()
               : this.logAggregationContext.getRolledLogsExcludePattern(),
           candidates, true);
 
-        Iterable<File> mask =
-            Iterables.filter(candidates, new Predicate<File>() {
-              @Override
-              public boolean apply(File next) {
-                return !alreadyUploadedLogFiles
+      Iterable<File> mask =
+          Iterables.filter(candidates, new Predicate<File>() {
+            @Override
+            public boolean apply(File next) {
+              return !alreadyUploadedLogFiles
                   .contains(getLogFileMetaData(next));
-              }
-            });
-        candidates = Sets.newHashSet(mask);
-      }
-      return candidates;
+            }
+          });
+      return Sets.newHashSet(mask);
     }
 
     private void filterFiles(String pattern, Set<File> candidates,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81047050/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index a5b1e2c..fa07c59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -348,7 +348,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
           containerLogAggregators.put(container, aggregator);
         }
         Set<Path> uploadedFilePathsInThisCycle =
-            aggregator.doContainerLogAggregation(writer, appFinished);
+            aggregator.doContainerLogAggregation(writer, appFinished,
+            finishedContainers.contains(container));
         if (uploadedFilePathsInThisCycle.size() > 0) {
           uploadedLogsInThisCycle = true;
           this.delService.delete(this.userUgi.getShortUserName(), null,
@@ -650,15 +651,15 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     }
 
     public Set<Path> doContainerLogAggregation(LogWriter writer,
-        boolean appFinished) {
+        boolean appFinished, boolean containerFinished) {
       LOG.info("Uploading logs for container " + containerId
           + ". Current good log dirs are "
           + StringUtils.join(",", dirsHandler.getLogDirsForRead()));
       final LogKey logKey = new LogKey(containerId);
       final LogValue logValue =
           new LogValue(dirsHandler.getLogDirsForRead(), containerId,
-            userUgi.getShortUserName(), logAggregationContext,
-            this.uploadedFileMeta, appFinished);
+              userUgi.getShortUserName(), logAggregationContext,
+              this.uploadedFileMeta, appFinished, containerFinished);
       try {
         writer.append(logKey, logValue);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81047050/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index d56d030..c0a3f3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -1435,6 +1435,102 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
       "getApplicationID");
   }
 
+  @SuppressWarnings("resource")
+  @Test (timeout = 50000)
+  public void testLogAggregationServiceWithPatternsAndIntervals()
+      throws Exception {
+    LogAggregationContext logAggregationContext =
+        Records.newRecord(LogAggregationContext.class);
+    // set IncludePattern and RolledLogsIncludePattern.
+    // When the app is running, we only aggregate the log with
+    // the name stdout. After the app finishes, we only aggregate
+    // the log with the name std_final.
+    logAggregationContext.setRolledLogsIncludePattern("stdout");
+    logAggregationContext.setIncludePattern("std_final");
+    this.conf.set(
+        YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    //configure YarnConfiguration.NM_REMOTE_APP_LOG_DIR to
+    //have fully qualified path
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.toURI().toString());
+    this.conf.setLong(
+        YarnConfiguration.NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS,
+        3600);
+
+    this.conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
+
+    ApplicationId application =
+        BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(application, 1);
+    ContainerId container = createContainer(appAttemptId, 1,
+        ContainerType.APPLICATION_MASTER);
+
+    ConcurrentMap<ApplicationId, Application> maps =
+        this.context.getApplications();
+    Application app = mock(Application.class);
+    maps.put(application, app);
+    when(app.getContainers()).thenReturn(this.context.getContainers());
+
+    LogAggregationService logAggregationService =
+        new LogAggregationService(dispatcher, context, this.delSrvc,
+          super.dirsHandler);
+
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    // AppLogDir should be created
+    File appLogDir =
+        new File(localLogDir, ConverterUtils.toString(application));
+    appLogDir.mkdir();
+    logAggregationService.handle(new LogHandlerAppStartedEvent(application,
+        this.user, null, this.acls, logAggregationContext));
+
+    // Simulate log-file creation
+    // create std_final in log directory which will not be aggregated
+    // until the app finishes.
+    String[] logFilesWithFinalLog =
+        new String[] {"stdout", "std_final"};
+    writeContainerLogs(appLogDir, container, logFilesWithFinalLog);
+
+    // Do log aggregation
+    AppLogAggregatorImpl aggregator =
+        (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators()
+        .get(application);
+
+    aggregator.doLogAggregationOutOfBand();
+
+    Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+        50, 1, false, null));
+
+    String[] logFiles = new String[] { "stdout" };
+    verifyContainerLogs(logAggregationService, application,
+        new ContainerId[] {container}, logFiles, 1, true);
+
+    logAggregationService.handle(
+        new LogHandlerContainerFinishedEvent(container, 0));
+
+    dispatcher.await();
+
+    // Do the log aggregation after ContainerFinishedEvent but before
+    // AppFinishedEvent. The std_final is expected to be aggregated this time
+    // even if the app is running but the container finishes.
+    aggregator.doLogAggregationOutOfBand();
+
+    Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+        50, 2, false, null));
+
+    // This container finishes.
+    // The log "std_final" should be aggregated this time.
+    String[] logFinalLog = new String[] {"std_final"};
+    verifyContainerLogs(logAggregationService, application,
+        new ContainerId[] {container}, logFinalLog, 1, true);
+
+    logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
+
+    logAggregationService.stop();
+  }
+
   @Test (timeout = 50000)
   @SuppressWarnings("unchecked")
   public void testNoneContainerPolicy() throws Exception {
@@ -1443,14 +1539,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
     LogAggregationService logAggregationService = createLogAggregationService(
         appId, NoneContainerLogAggregationPolicy.class, null);
 
-    String[] logFiles = new String[] { "stdout" };
+    String[] logFiles = new String[] {"stdout"};
     ContainerId container1 = finishContainer(appId, logAggregationService,
         ContainerType.APPLICATION_MASTER, 1, 0, logFiles);
 
     finishApplication(appId, logAggregationService);
 
     verifyContainerLogs(logAggregationService, appId,
-        new ContainerId[] { container1 }, logFiles, 0, false);
+        new ContainerId[] {container1}, logFiles, 0, false);
 
     verifyLogAggFinishEvent(appId);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message