hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From junping...@apache.org
Subject hadoop git commit: YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating running logs of application when rolling is enabled. Contributed by Xuan Gong.
Date Mon, 18 Jan 2016 02:04:01 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 b05bb4984 -> a5b85634b


YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating running logs
of application when rolling is enabled. Contributed by Xuan Gong.

(cherry picked from commit 863079bb874ba77918ca1c0741eae10e245995c8)
(cherry picked from commit 53aa3a4d1f2e02ab60fd8b4485286b57df5fcdf9)

Conflicts:

	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a5b85634
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a5b85634
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a5b85634

Branch: refs/heads/branch-2.6
Commit: a5b85634b4e35e195b32a7501a59ce9a869d1097
Parents: b05bb49
Author: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Authored: Thu Mar 12 13:32:29 2015 -0700
Committer: Junping Du <junping_du@apache.org>
Committed: Sun Jan 17 18:12:40 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../yarn/api/records/LogAggregationContext.java | 79 ++++++++++++++++++--
 .../src/main/proto/yarn_protos.proto            |  2 +
 .../impl/pb/LogAggregationContextPBImpl.java    | 39 ++++++++++
 .../logaggregation/AggregatedLogFormat.java     | 39 +++++-----
 .../logaggregation/AppLogAggregatorImpl.java    | 22 ++++--
 .../TestContainerManagerRecovery.java           | 10 ++-
 .../TestLogAggregationService.java              | 73 +++++++++++++++---
 .../capacity/TestContainerAllocation.java       |  8 +-
 9 files changed, 227 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b85634/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 31bf007..a9ad1e2 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -380,6 +380,9 @@ Release 2.6.1 - 2015-09-23
     YARN-2890. MiniYarnCluster should turn on timeline service if
     configured to do so. (Mit Desai via hitesh)
 
+    YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating
+    running logs of application when rolling is enabled. (Xuan Gong via vinodkv)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b85634/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
index 46c1809..e582d2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
@@ -32,11 +32,20 @@ import org.apache.hadoop.yarn.util.Records;
  *   <ul>
  *     <li>includePattern. It uses Java Regex to filter the log files
  *     which match the defined include pattern and those log files
- *     will be uploaded. </li>
+ *     will be uploaded when the application finishes. </li>
  *     <li>excludePattern. It uses Java Regex to filter the log files
  *     which match the defined exclude pattern and those log files
- *     will not be uploaded. If the log file name matches both the
- *     include and the exclude pattern, this file will be excluded eventually</li>
+ *     will not be uploaded when application finishes. If the log file
+ *     name matches both the include and the exclude pattern, this file
+ *     will be excluded eventually</li>
+ *     <li>rolledLogsIncludePattern. It uses Java Regex to filter the log files
+ *     which match the defined include pattern and those log files
+ *     will be aggregated in a rolling fashion.</li>
+ *     <li>rolledLogsExcludePattern. It uses Java Regex to filter the log files
+ *     which match the defined exclude pattern and those log files
+ *     will not be aggregated in a rolling fashion. If the log file
+ *     name matches both the include and the exclude pattern, this file
+ *     will be excluded eventually</li>
  *   </ul>
  * </p>
  *
@@ -57,8 +66,23 @@ public abstract class LogAggregationContext {
     return context;
   }
 
+  @Public
+  @Unstable
+  public static LogAggregationContext newInstance(String includePattern,
+      String excludePattern, String rolledLogsIncludePattern,
+      String rolledLogsExcludePattern) {
+    LogAggregationContext context =
+        Records.newRecord(LogAggregationContext.class);
+    context.setIncludePattern(includePattern);
+    context.setExcludePattern(excludePattern);
+    context.setRolledLogsIncludePattern(rolledLogsIncludePattern);
+    context.setRolledLogsExcludePattern(rolledLogsExcludePattern);
+    return context;
+  }
+
   /**
-   * Get include pattern
+   * Get include pattern. This includePattern only takes affect
+   * on logs that exist at the time of application finish.
    *
    * @return include pattern
    */
@@ -67,7 +91,8 @@ public abstract class LogAggregationContext {
   public abstract String getIncludePattern();
 
   /**
-   * Set include pattern
+   * Set include pattern. This includePattern only takes affect
+   * on logs that exist at the time of application finish.
    *
    * @param includePattern
    */
@@ -76,7 +101,8 @@ public abstract class LogAggregationContext {
   public abstract void setIncludePattern(String includePattern);
 
   /**
-   * Get exclude pattern
+   * Get exclude pattern. This excludePattern only takes affect
+   * on logs that exist at the time of application finish.
    *
    * @return exclude pattern
    */
@@ -85,11 +111,50 @@ public abstract class LogAggregationContext {
   public abstract String getExcludePattern();
 
   /**
-   * Set exclude pattern
+   * Set exclude pattern. This excludePattern only takes affect
+   * on logs that exist at the time of application finish.
    *
    * @param excludePattern
    */
   @Public
   @Unstable
   public abstract void setExcludePattern(String excludePattern);
+
+  /**
+   * Get include pattern in a rolling fashion.
+   * 
+   * @return include pattern
+   */
+  @Public
+  @Unstable
+  public abstract String getRolledLogsIncludePattern();
+
+  /**
+   * Set include pattern in a rolling fashion.
+   * 
+   * @param rolledLogsIncludePattern
+   */
+  @Public
+  @Unstable
+  public abstract void setRolledLogsIncludePattern(
+      String rolledLogsIncludePattern);
+
+  /**
+   * Get exclude pattern for aggregation in a rolling fashion.
+   * 
+   * @return exclude pattern
+   */
+  @Public
+  @Unstable
+  public abstract String getRolledLogsExcludePattern();
+
+  /**
+   * Set exclude pattern for in a rolling fashion.
+   * 
+   * @param rolledLogsExcludePattern
+   */
+  @Public
+  @Unstable
+  public abstract void setRolledLogsExcludePattern(
+      String rolledLogsExcludePattern);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b85634/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index db01770..8307a7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -308,6 +308,8 @@ message ApplicationSubmissionContextProto {
 message LogAggregationContextProto {
  optional string include_pattern = 1 [default = ".*"];
  optional string exclude_pattern = 2 [default = ""];
+ optional string rolled_logs_include_pattern = 3 [default = ""];
+ optional string rolled_logs_exclude_pattern = 4 [default = ".*"];
 }
 
 enum ApplicationAccessTypeProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b85634/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
index dc7a21d..f6409bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
+
 import com.google.protobuf.TextFormat;
 
 public class LogAggregationContextPBImpl extends LogAggregationContext{
@@ -116,4 +117,42 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{
     }
     builder.setExcludePattern(excludePattern);
   }
+
+  @Override
+  public String getRolledLogsIncludePattern() {
+    LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (! p.hasRolledLogsIncludePattern()) {
+      return null;
+    }
+    return p.getRolledLogsIncludePattern();
+  }
+
+  @Override
+  public void setRolledLogsIncludePattern(String rolledLogsIncludePattern) {
+    maybeInitBuilder();
+    if (rolledLogsIncludePattern == null) {
+      builder.clearRolledLogsIncludePattern();
+      return;
+    }
+    builder.setRolledLogsIncludePattern(rolledLogsIncludePattern);
+  }
+
+  @Override
+  public String getRolledLogsExcludePattern() {
+    LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (! p.hasRolledLogsExcludePattern()) {
+      return null;
+    }
+    return p.getRolledLogsExcludePattern();
+  }
+
+  @Override
+  public void setRolledLogsExcludePattern(String rolledLogsExcludePattern) {
+    maybeInitBuilder();
+    if (rolledLogsExcludePattern == null) {
+      builder.clearRolledLogsExcludePattern();
+      return;
+    }
+    builder.setRolledLogsExcludePattern(rolledLogsExcludePattern);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b85634/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index 7eebcb3..f5cf5fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -166,17 +166,18 @@ public class AggregatedLogFormat {
     private Set<File> uploadedFiles = new HashSet<File>();
     private final Set<String> alreadyUploadedLogFiles;
     private Set<String> allExistingFileMeta = new HashSet<String>();
+    private final boolean appFinished;
     // TODO Maybe add a version string here. Instead of changing the version of
     // the entire k-v format
 
     public LogValue(List<String> rootLogDirs, ContainerId containerId,
         String user) {
-      this(rootLogDirs, containerId, user, null, new HashSet<String>());
+      this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
     }
 
     public LogValue(List<String> rootLogDirs, ContainerId containerId,
         String user, LogAggregationContext logAggregationContext,
-        Set<String> alreadyUploadedLogFiles) {
+        Set<String> alreadyUploadedLogFiles, boolean appFinished) {
       this.rootLogDirs = new ArrayList<String>(rootLogDirs);
       this.containerId = containerId;
       this.user = user;
@@ -185,6 +186,7 @@ public class AggregatedLogFormat {
       Collections.sort(this.rootLogDirs);
       this.logAggregationContext = logAggregationContext;
       this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
+      this.appFinished = appFinished;
     }
 
     private Set<File> getPendingLogFilesToUploadForThisContainer() {
@@ -295,17 +297,15 @@ public class AggregatedLogFormat {
       }
 
       if (this.logAggregationContext != null && candidates.size() > 0) {
-        if (this.logAggregationContext.getIncludePattern() != null
-            && !this.logAggregationContext.getIncludePattern().isEmpty()) {
-          filterFiles(this.logAggregationContext.getIncludePattern(),
-              candidates, false);
-        }
+        filterFiles(
+          this.appFinished ? this.logAggregationContext.getIncludePattern()
+              : this.logAggregationContext.getRolledLogsIncludePattern(),
+          candidates, false);
 
-        if (this.logAggregationContext.getExcludePattern() != null
-            && !this.logAggregationContext.getExcludePattern().isEmpty()) {
-          filterFiles(this.logAggregationContext.getExcludePattern(),
-              candidates, true);
-        }
+        filterFiles(
+          this.appFinished ? this.logAggregationContext.getExcludePattern()
+              : this.logAggregationContext.getRolledLogsExcludePattern(),
+          candidates, true);
 
         Iterable<File> mask =
             Iterables.filter(candidates, new Predicate<File>() {
@@ -322,14 +322,15 @@ public class AggregatedLogFormat {
 
     private void filterFiles(String pattern, Set<File> candidates,
         boolean exclusion) {
-      Pattern filterPattern =
-          Pattern.compile(pattern);
-      for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
+      if (pattern != null && !pattern.isEmpty()) {
+        Pattern filterPattern = Pattern.compile(pattern);
+        for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
           .hasNext();) {
-        File candidate = candidatesItr.next();
-        boolean match = filterPattern.matcher(candidate.getName()).find();
-        if ((!match && !exclusion) || (match && exclusion)) {
-          candidatesItr.remove();
+          File candidate = candidatesItr.next();
+          boolean match = filterPattern.matcher(candidate.getName()).find();
+          if ((!match && !exclusion) || (match && exclusion)) {
+            candidatesItr.remove();
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b85634/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 65c5501..a0f3c48 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -116,6 +115,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final Context context;
   private final int retentionSize;
   private final long rollingMonitorInterval;
+  private final boolean logAggregationInRolling;
   private final NodeId nodeId;
 
   private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
@@ -191,9 +191,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       }
       this.rollingMonitorInterval = configuredRollingMonitorInterval;
     }
+    this.logAggregationInRolling =
+        this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
+            || this.logAggregationContext.getRolledLogsIncludePattern() == null
+            || this.logAggregationContext.getRolledLogsIncludePattern()
+              .isEmpty() ? false : true;
   }
 
-  private void uploadLogsForContainers() {
+  private void uploadLogsForContainers(boolean appFinished) {
     if (this.logAggregationDisabled) {
       return;
     }
@@ -260,7 +265,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
           containerLogAggregators.put(container, aggregator);
         }
         Set<Path> uploadedFilePathsInThisCycle =
-            aggregator.doContainerLogAggregation(writer);
+            aggregator.doContainerLogAggregation(writer, appFinished);
         if (uploadedFilePathsInThisCycle.size() > 0) {
           uploadedLogsInThisCycle = true;
         }
@@ -391,12 +396,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     while (!this.appFinishing.get() && !this.aborted.get()) {
       synchronized(this) {
         try {
-          if (this.rollingMonitorInterval > 0) {
+          if (logAggregationInRolling) {
             wait(this.rollingMonitorInterval * 1000);
             if (this.appFinishing.get() || this.aborted.get()) {
               break;
             }
-            uploadLogsForContainers();
+            uploadLogsForContainers(false);
           } else {
             wait(THREAD_SLEEP_TIME);
           }
@@ -412,7 +417,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     }
 
     // App is finished, upload the container logs.
-    uploadLogsForContainers();
+    uploadLogsForContainers(true);
 
     // Remove the local app-log-dirs
     List<Path> localAppLogDirs = new ArrayList<Path>();
@@ -521,7 +526,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       this.containerId = containerId;
     }
 
-    public Set<Path> doContainerLogAggregation(LogWriter writer) {
+    public Set<Path> doContainerLogAggregation(LogWriter writer,
+        boolean appFinished) {
       LOG.info("Uploading logs for container " + containerId
           + ". Current good log dirs are "
           + StringUtils.join(",", dirsHandler.getLogDirsForRead()));
@@ -529,7 +535,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       final LogValue logValue =
           new LogValue(dirsHandler.getLogDirsForRead(), containerId,
             userUgi.getShortUserName(), logAggregationContext,
-            this.uploadedFileMeta);
+            this.uploadedFileMeta, appFinished);
       try {
         writer.append(logKey, logValue);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b85634/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index a73d583..c45ffbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -130,8 +130,10 @@ public class TestContainerManagerRecovery {
         containerTokens, acls);
     // create the logAggregationContext
     LogAggregationContext logAggregationContext =
-        LogAggregationContext.newInstance("includePattern", "excludePattern");
-    StartContainersResponse startResponse = startContainer(context, cm, cid,
+        LogAggregationContext.newInstance("includePattern", "excludePattern",
+          "includePatternInRollingAggregation",
+          "excludePatternInRollingAggregation");
+   StartContainersResponse startResponse = startContainer(context, cm, cid,
         clc, logAggregationContext);
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertEquals(1, context.getApplications().size());
@@ -171,6 +173,10 @@ public class TestContainerManagerRecovery {
       recovered.getIncludePattern());
     assertEquals(logAggregationContext.getExcludePattern(),
       recovered.getExcludePattern());
+    assertEquals(logAggregationContext.getRolledLogsIncludePattern(),
+      recovered.getRolledLogsIncludePattern());
+    assertEquals(logAggregationContext.getRolledLogsExcludePattern(),
+      recovered.getRolledLogsExcludePattern());
 
     waitForAppState(app, ApplicationState.INITING);
     assertTrue(context.getApplicationACLsManager().checkAccess(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b85634/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index e2c45db..d81e195 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -730,7 +730,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
     }
   }
 
-  private String verifyContainerLogs(LogAggregationService logAggregationService,
+  private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService,
       ApplicationId appId, ContainerId[] expectedContainerIds,
       String[] logFiles, int numOfContainerLogs, boolean multiLogs)
       throws IOException {
@@ -775,7 +775,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
         new AggregatedLogFormat.LogReader(this.conf, targetNodeFile.getPath());
     Assert.assertEquals(this.user, reader.getApplicationOwner());
     verifyAcls(reader.getApplicationAcls());
-    
+
+    List<String> fileTypes = new ArrayList<String>();
+
     try {
       Map<String, Map<String, String>> logMap =
           new HashMap<String, Map<String, String>>();
@@ -801,6 +803,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
 
             Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
             String fileType = writtenLines[0].substring(8);
+            fileTypes.add(fileType);
 
             Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
             String fileLengthStr = writtenLines[1].substring(10);
@@ -843,7 +846,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
         Assert.assertEquals(0, thisContainerMap.size());
       }
       Assert.assertEquals(0, logMap.size());
-      return targetNodeFile.getPath().getName();
+      return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes);
     } finally {
       reader.close();
     }
@@ -1326,6 +1329,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
       throws Exception {
     LogAggregationContext logAggregationContextWithInterval =
         Records.newRecord(LogAggregationContext.class);
+    // set IncludePattern/excludePattern in rolling fashion
+    // we expect all the logs except std_final will be uploaded
+    // when app is running. The std_final will be uploaded when
+    // the app finishes.
+    logAggregationContextWithInterval.setRolledLogsIncludePattern(".*");
+    logAggregationContextWithInterval.setRolledLogsExcludePattern("std_final");
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
       this.remoteRootLogDir.getAbsolutePath());
@@ -1379,9 +1388,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
       this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
       logAggregationContextWithInterval));
 
+    LogFileStatusInLastCycle logFileStatusInLastCycle = null;
     // Simulate log-file creation
-    String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" };
-    writeContainerLogs(appLogDir, container, logFiles1);
+    // create std_final in log directory which will not be aggregated
+    // until the app finishes.
+    String[] logFiles1WithFinalLog =
+        new String[] { "stdout", "stderr", "syslog", "std_final" };
+    String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"};
+    writeContainerLogs(appLogDir, container, logFiles1WithFinalLog);
 
     // Do log aggregation
     AppLogAggregatorImpl aggregator =
@@ -1396,10 +1410,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
         50, 1, false, null));
     }
-    String logFileInLastCycle = null;
     // Container logs should be uploaded
-    logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
+    logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
         new ContainerId[] { container }, logFiles1, 3, true);
+    for(String logFile : logFiles1) {
+      Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+        .contains(logFile));
+    }
+    // Make sure the std_final is not uploaded.
+    Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+      .contains("std_final"));
 
     Thread.sleep(2000);
 
@@ -1421,15 +1441,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
 
     if (retentionSizeLimitation) {
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
-        50, 1, true, logFileInLastCycle));
+        50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
     } else {
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
         50, 2, false, null));
     }
     // Container logs should be uploaded
-    logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
+    logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
         new ContainerId[] { container }, logFiles2, 3, true);
 
+    for(String logFile : logFiles2) {
+      Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+        .contains(logFile));
+    }
+    // Make sure the std_final is not uploaded.
+    Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+      .contains("std_final"));
+
     Thread.sleep(2000);
 
     // create another logs
@@ -1443,13 +1471,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
     logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
     if (retentionSizeLimitation) {
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
-        50, 1, true, logFileInLastCycle));
+        50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
     } else {
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
         50, 3, false, null));
     }
+
+    // the app is finished. The log "std_final" should be aggregated this time.
+    String[] logFiles3WithFinalLog =
+        new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" };
     verifyContainerLogs(logAggregationService, application,
-      new ContainerId[] { container }, logFiles3, 3, true);
+      new ContainerId[] { container }, logFiles3WithFinalLog, 4, true);
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
     dispatcher.stop();
@@ -1557,4 +1589,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
     return numOfLogsAvailable(logAggregationService, application, sizeLimited,
       lastLogFile) == expectNum;
   }
+
+  private static class LogFileStatusInLastCycle {
+    private String logFilePathInLastCycle;
+    private List<String> logFileTypesInLastCycle;
+
+    public LogFileStatusInLastCycle(String logFilePathInLastCycle,
+        List<String> logFileTypesInLastCycle) {
+      this.logFilePathInLastCycle = logFilePathInLastCycle;
+      this.logFileTypesInLastCycle = logFileTypesInLastCycle;
+    }
+
+    public String getLogFilePathInLastCycle() {
+      return this.logFilePathInLastCycle;
+    }
+
+    public List<String> getLogFileTypesInLastCycle() {
+      return this.logFileTypesInLastCycle;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a5b85634/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index a9f9420..c1bed05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -230,12 +230,18 @@ public class TestContainerAllocation {
     // create a not-null LogAggregationContext
     LogAggregationContext logAggregationContext =
         LogAggregationContext.newInstance(
-          "includePattern", "excludePattern");
+          "includePattern", "excludePattern",
+          "rolledLogsIncludePattern",
+          "rolledLogsExcludePattern");
     LogAggregationContext returned =
         getLogAggregationContextFromContainerToken(rm1, nm2,
           logAggregationContext);
     Assert.assertEquals("includePattern", returned.getIncludePattern());
     Assert.assertEquals("excludePattern", returned.getExcludePattern());
+    Assert.assertEquals("rolledLogsIncludePattern",
+      returned.getRolledLogsIncludePattern());
+    Assert.assertEquals("rolledLogsExcludePattern",
+      returned.getRolledLogsExcludePattern());
     rm1.stop();
   }
 


Mime
View raw message