hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [47/49] hadoop git commit: YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating running logs of application when rolling is enabled. Contributed by Xuan Gong.
Date Thu, 12 Mar 2015 23:05:06 GMT
YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating running logs
of application when rolling is enabled. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/863079bb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/863079bb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/863079bb

Branch: refs/heads/YARN-2928
Commit: 863079bb874ba77918ca1c0741eae10e245995c8
Parents: b49c3a1
Author: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Authored: Thu Mar 12 13:32:29 2015 -0700
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Thu Mar 12 13:32:29 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../yarn/api/records/LogAggregationContext.java | 79 ++++++++++++++++++--
 .../src/main/proto/yarn_protos.proto            |  2 +
 .../impl/pb/LogAggregationContextPBImpl.java    | 39 ++++++++++
 .../logaggregation/AggregatedLogFormat.java     | 39 +++++-----
 .../logaggregation/AppLogAggregatorImpl.java    | 22 ++++--
 .../TestContainerManagerRecovery.java           | 10 ++-
 .../TestLogAggregationService.java              | 73 +++++++++++++++---
 .../capacity/TestContainerAllocation.java       |  8 +-
 9 files changed, 227 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/863079bb/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 11d1cc9..80d2697 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -757,6 +757,9 @@ Release 2.7.0 - UNRELEASED
 
     YARN-3338. Exclude jline dependency from YARN. (Zhijie Shen via xgong)
 
+    YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating
+    running logs of application when rolling is enabled. (Xuan Gong via vinodkv)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/863079bb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
index 46c1809..e582d2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
@@ -32,11 +32,20 @@ import org.apache.hadoop.yarn.util.Records;
  *   <ul>
  *     <li>includePattern. It uses Java Regex to filter the log files
  *     which match the defined include pattern and those log files
- *     will be uploaded. </li>
+ *     will be uploaded when the application finishes. </li>
  *     <li>excludePattern. It uses Java Regex to filter the log files
  *     which match the defined exclude pattern and those log files
- *     will not be uploaded. If the log file name matches both the
- *     include and the exclude pattern, this file will be excluded eventually</li>
+ *     will not be uploaded when application finishes. If the log file
+ *     name matches both the include and the exclude pattern, this file
+ *     will be excluded eventually</li>
+ *     <li>rolledLogsIncludePattern. It uses Java Regex to filter the log files
+ *     which match the defined include pattern and those log files
+ *     will be aggregated in a rolling fashion.</li>
+ *     <li>rolledLogsExcludePattern. It uses Java Regex to filter the log files
+ *     which match the defined exclude pattern and those log files
+ *     will not be aggregated in a rolling fashion. If the log file
+ *     name matches both the include and the exclude pattern, this file
+ *     will be excluded eventually</li>
  *   </ul>
  * </p>
  *
@@ -57,8 +66,23 @@ public abstract class LogAggregationContext {
     return context;
   }
 
+  @Public
+  @Unstable
+  public static LogAggregationContext newInstance(String includePattern,
+      String excludePattern, String rolledLogsIncludePattern,
+      String rolledLogsExcludePattern) {
+    LogAggregationContext context =
+        Records.newRecord(LogAggregationContext.class);
+    context.setIncludePattern(includePattern);
+    context.setExcludePattern(excludePattern);
+    context.setRolledLogsIncludePattern(rolledLogsIncludePattern);
+    context.setRolledLogsExcludePattern(rolledLogsExcludePattern);
+    return context;
+  }
+
   /**
-   * Get include pattern
+   * Get include pattern. This includePattern only takes affect
+   * on logs that exist at the time of application finish.
    *
    * @return include pattern
    */
@@ -67,7 +91,8 @@ public abstract class LogAggregationContext {
   public abstract String getIncludePattern();
 
   /**
-   * Set include pattern
+   * Set include pattern. This includePattern only takes affect
+   * on logs that exist at the time of application finish.
    *
    * @param includePattern
    */
@@ -76,7 +101,8 @@ public abstract class LogAggregationContext {
   public abstract void setIncludePattern(String includePattern);
 
   /**
-   * Get exclude pattern
+   * Get exclude pattern. This excludePattern only takes affect
+   * on logs that exist at the time of application finish.
    *
    * @return exclude pattern
    */
@@ -85,11 +111,50 @@ public abstract class LogAggregationContext {
   public abstract String getExcludePattern();
 
   /**
-   * Set exclude pattern
+   * Set exclude pattern. This excludePattern only takes affect
+   * on logs that exist at the time of application finish.
    *
    * @param excludePattern
    */
   @Public
   @Unstable
   public abstract void setExcludePattern(String excludePattern);
+
+  /**
+   * Get include pattern in a rolling fashion.
+   * 
+   * @return include pattern
+   */
+  @Public
+  @Unstable
+  public abstract String getRolledLogsIncludePattern();
+
+  /**
+   * Set include pattern in a rolling fashion.
+   * 
+   * @param rolledLogsIncludePattern
+   */
+  @Public
+  @Unstable
+  public abstract void setRolledLogsIncludePattern(
+      String rolledLogsIncludePattern);
+
+  /**
+   * Get exclude pattern for aggregation in a rolling fashion.
+   * 
+   * @return exclude pattern
+   */
+  @Public
+  @Unstable
+  public abstract String getRolledLogsExcludePattern();
+
+  /**
+   * Set exclude pattern for in a rolling fashion.
+   * 
+   * @param rolledLogsExcludePattern
+   */
+  @Public
+  @Unstable
+  public abstract void setRolledLogsExcludePattern(
+      String rolledLogsExcludePattern);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/863079bb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 90706ed..2edff99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -314,6 +314,8 @@ message ApplicationSubmissionContextProto {
 message LogAggregationContextProto {
  optional string include_pattern = 1 [default = ".*"];
  optional string exclude_pattern = 2 [default = ""];
+ optional string rolled_logs_include_pattern = 3 [default = ""];
+ optional string rolled_logs_exclude_pattern = 4 [default = ".*"];
 }
 
 enum ApplicationAccessTypeProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/863079bb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
index dc7a21d..f6409bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.LogAggregationContextProtoOrBuilder;
+
 import com.google.protobuf.TextFormat;
 
 public class LogAggregationContextPBImpl extends LogAggregationContext{
@@ -116,4 +117,42 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{
     }
     builder.setExcludePattern(excludePattern);
   }
+
+  @Override
+  public String getRolledLogsIncludePattern() {
+    LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (! p.hasRolledLogsIncludePattern()) {
+      return null;
+    }
+    return p.getRolledLogsIncludePattern();
+  }
+
+  @Override
+  public void setRolledLogsIncludePattern(String rolledLogsIncludePattern) {
+    maybeInitBuilder();
+    if (rolledLogsIncludePattern == null) {
+      builder.clearRolledLogsIncludePattern();
+      return;
+    }
+    builder.setRolledLogsIncludePattern(rolledLogsIncludePattern);
+  }
+
+  @Override
+  public String getRolledLogsExcludePattern() {
+    LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (! p.hasRolledLogsExcludePattern()) {
+      return null;
+    }
+    return p.getRolledLogsExcludePattern();
+  }
+
+  @Override
+  public void setRolledLogsExcludePattern(String rolledLogsExcludePattern) {
+    maybeInitBuilder();
+    if (rolledLogsExcludePattern == null) {
+      builder.clearRolledLogsExcludePattern();
+      return;
+    }
+    builder.setRolledLogsExcludePattern(rolledLogsExcludePattern);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/863079bb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index b669332..29122dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -167,17 +167,18 @@ public class AggregatedLogFormat {
     private Set<File> uploadedFiles = new HashSet<File>();
     private final Set<String> alreadyUploadedLogFiles;
     private Set<String> allExistingFileMeta = new HashSet<String>();
+    private final boolean appFinished;
     // TODO Maybe add a version string here. Instead of changing the version of
     // the entire k-v format
 
     public LogValue(List<String> rootLogDirs, ContainerId containerId,
         String user) {
-      this(rootLogDirs, containerId, user, null, new HashSet<String>());
+      this(rootLogDirs, containerId, user, null, new HashSet<String>(), true);
     }
 
     public LogValue(List<String> rootLogDirs, ContainerId containerId,
         String user, LogAggregationContext logAggregationContext,
-        Set<String> alreadyUploadedLogFiles) {
+        Set<String> alreadyUploadedLogFiles, boolean appFinished) {
       this.rootLogDirs = new ArrayList<String>(rootLogDirs);
       this.containerId = containerId;
       this.user = user;
@@ -186,6 +187,7 @@ public class AggregatedLogFormat {
       Collections.sort(this.rootLogDirs);
       this.logAggregationContext = logAggregationContext;
       this.alreadyUploadedLogFiles = alreadyUploadedLogFiles;
+      this.appFinished = appFinished;
     }
 
     private Set<File> getPendingLogFilesToUploadForThisContainer() {
@@ -296,17 +298,15 @@ public class AggregatedLogFormat {
       }
 
       if (this.logAggregationContext != null && candidates.size() > 0) {
-        if (this.logAggregationContext.getIncludePattern() != null
-            && !this.logAggregationContext.getIncludePattern().isEmpty()) {
-          filterFiles(this.logAggregationContext.getIncludePattern(),
-              candidates, false);
-        }
+        filterFiles(
+          this.appFinished ? this.logAggregationContext.getIncludePattern()
+              : this.logAggregationContext.getRolledLogsIncludePattern(),
+          candidates, false);
 
-        if (this.logAggregationContext.getExcludePattern() != null
-            && !this.logAggregationContext.getExcludePattern().isEmpty()) {
-          filterFiles(this.logAggregationContext.getExcludePattern(),
-              candidates, true);
-        }
+        filterFiles(
+          this.appFinished ? this.logAggregationContext.getExcludePattern()
+              : this.logAggregationContext.getRolledLogsExcludePattern(),
+          candidates, true);
 
         Iterable<File> mask =
             Iterables.filter(candidates, new Predicate<File>() {
@@ -323,14 +323,15 @@ public class AggregatedLogFormat {
 
     private void filterFiles(String pattern, Set<File> candidates,
         boolean exclusion) {
-      Pattern filterPattern =
-          Pattern.compile(pattern);
-      for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
+      if (pattern != null && !pattern.isEmpty()) {
+        Pattern filterPattern = Pattern.compile(pattern);
+        for (Iterator<File> candidatesItr = candidates.iterator(); candidatesItr
           .hasNext();) {
-        File candidate = candidatesItr.next();
-        boolean match = filterPattern.matcher(candidate.getName()).find();
-        if ((!match && !exclusion) || (match && exclusion)) {
-          candidatesItr.remove();
+          File candidate = candidatesItr.next();
+          boolean match = filterPattern.matcher(candidate.getName()).find();
+          if ((!match && !exclusion) || (match && exclusion)) {
+            candidatesItr.remove();
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/863079bb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 787422b..ff70a68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -44,7 +44,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -116,6 +115,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final Context context;
   private final int retentionSize;
   private final long rollingMonitorInterval;
+  private final boolean logAggregationInRolling;
   private final NodeId nodeId;
   // This variable is only for testing
   private final AtomicBoolean waiting = new AtomicBoolean(false);
@@ -193,9 +193,14 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       }
       this.rollingMonitorInterval = configuredRollingMonitorInterval;
     }
+    this.logAggregationInRolling =
+        this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
+            || this.logAggregationContext.getRolledLogsIncludePattern() == null
+            || this.logAggregationContext.getRolledLogsIncludePattern()
+              .isEmpty() ? false : true;
   }
 
-  private void uploadLogsForContainers() {
+  private void uploadLogsForContainers(boolean appFinished) {
     if (this.logAggregationDisabled) {
       return;
     }
@@ -262,7 +267,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
           containerLogAggregators.put(container, aggregator);
         }
         Set<Path> uploadedFilePathsInThisCycle =
-            aggregator.doContainerLogAggregation(writer);
+            aggregator.doContainerLogAggregation(writer, appFinished);
         if (uploadedFilePathsInThisCycle.size() > 0) {
           uploadedLogsInThisCycle = true;
         }
@@ -394,12 +399,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       synchronized(this) {
         try {
           waiting.set(true);
-          if (this.rollingMonitorInterval > 0) {
+          if (logAggregationInRolling) {
             wait(this.rollingMonitorInterval * 1000);
             if (this.appFinishing.get() || this.aborted.get()) {
               break;
             }
-            uploadLogsForContainers();
+            uploadLogsForContainers(false);
           } else {
             wait(THREAD_SLEEP_TIME);
           }
@@ -415,7 +420,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     }
 
     // App is finished, upload the container logs.
-    uploadLogsForContainers();
+    uploadLogsForContainers(true);
 
     // Remove the local app-log-dirs
     List<Path> localAppLogDirs = new ArrayList<Path>();
@@ -536,7 +541,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       this.containerId = containerId;
     }
 
-    public Set<Path> doContainerLogAggregation(LogWriter writer) {
+    public Set<Path> doContainerLogAggregation(LogWriter writer,
+        boolean appFinished) {
       LOG.info("Uploading logs for container " + containerId
           + ". Current good log dirs are "
           + StringUtils.join(",", dirsHandler.getLogDirs()));
@@ -544,7 +550,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       final LogValue logValue =
           new LogValue(dirsHandler.getLogDirs(), containerId,
             userUgi.getShortUserName(), logAggregationContext,
-            this.uploadedFileMeta);
+            this.uploadedFileMeta, appFinished);
       try {
         writer.append(logKey, logValue);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/863079bb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index a73d583..c45ffbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -130,8 +130,10 @@ public class TestContainerManagerRecovery {
         containerTokens, acls);
     // create the logAggregationContext
     LogAggregationContext logAggregationContext =
-        LogAggregationContext.newInstance("includePattern", "excludePattern");
-    StartContainersResponse startResponse = startContainer(context, cm, cid,
+        LogAggregationContext.newInstance("includePattern", "excludePattern",
+          "includePatternInRollingAggregation",
+          "excludePatternInRollingAggregation");
+   StartContainersResponse startResponse = startContainer(context, cm, cid,
         clc, logAggregationContext);
     assertTrue(startResponse.getFailedRequests().isEmpty());
     assertEquals(1, context.getApplications().size());
@@ -171,6 +173,10 @@ public class TestContainerManagerRecovery {
       recovered.getIncludePattern());
     assertEquals(logAggregationContext.getExcludePattern(),
       recovered.getExcludePattern());
+    assertEquals(logAggregationContext.getRolledLogsIncludePattern(),
+      recovered.getRolledLogsIncludePattern());
+    assertEquals(logAggregationContext.getRolledLogsExcludePattern(),
+      recovered.getRolledLogsExcludePattern());
 
     waitForAppState(app, ApplicationState.INITING);
     assertTrue(context.getApplicationACLsManager().checkAccess(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/863079bb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 901e45a..df51a0d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -698,7 +698,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
     }
   }
 
-  private String verifyContainerLogs(LogAggregationService logAggregationService,
+  private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService,
       ApplicationId appId, ContainerId[] expectedContainerIds,
       String[] logFiles, int numOfContainerLogs, boolean multiLogs)
       throws IOException {
@@ -743,7 +743,9 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
         new AggregatedLogFormat.LogReader(this.conf, targetNodeFile.getPath());
     Assert.assertEquals(this.user, reader.getApplicationOwner());
     verifyAcls(reader.getApplicationAcls());
-    
+
+    List<String> fileTypes = new ArrayList<String>();
+
     try {
       Map<String, Map<String, String>> logMap =
           new HashMap<String, Map<String, String>>();
@@ -769,6 +771,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
 
             Assert.assertEquals("LogType:", writtenLines[0].substring(0, 8));
             String fileType = writtenLines[0].substring(8);
+            fileTypes.add(fileType);
 
             Assert.assertEquals("LogLength:", writtenLines[1].substring(0, 10));
             String fileLengthStr = writtenLines[1].substring(10);
@@ -811,7 +814,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
         Assert.assertEquals(0, thisContainerMap.size());
       }
       Assert.assertEquals(0, logMap.size());
-      return targetNodeFile.getPath().getName();
+      return new LogFileStatusInLastCycle(targetNodeFile.getPath().getName(), fileTypes);
     } finally {
       reader.close();
     }
@@ -1289,6 +1292,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
       throws Exception {
     LogAggregationContext logAggregationContextWithInterval =
         Records.newRecord(LogAggregationContext.class);
+    // set IncludePattern/excludePattern in rolling fashion
+    // we expect all the logs except std_final will be uploaded
+    // when app is running. The std_final will be uploaded when
+    // the app finishes.
+    logAggregationContextWithInterval.setRolledLogsIncludePattern(".*");
+    logAggregationContextWithInterval.setRolledLogsExcludePattern("std_final");
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
       this.remoteRootLogDir.getAbsolutePath());
@@ -1338,9 +1347,14 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
       this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls,
       logAggregationContextWithInterval));
 
+    LogFileStatusInLastCycle logFileStatusInLastCycle = null;
     // Simulate log-file creation
-    String[] logFiles1 = new String[] { "stdout", "stderr", "syslog" };
-    writeContainerLogs(appLogDir, container, logFiles1);
+    // create std_final in log directory which will not be aggregated
+    // until the app finishes.
+    String[] logFiles1WithFinalLog =
+        new String[] { "stdout", "stderr", "syslog", "std_final" };
+    String[] logFiles1 = new String[] { "stdout", "stderr", "syslog"};
+    writeContainerLogs(appLogDir, container, logFiles1WithFinalLog);
 
     // Do log aggregation
     AppLogAggregatorImpl aggregator =
@@ -1355,10 +1369,16 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
         50, 1, false, null));
     }
-    String logFileInLastCycle = null;
     // Container logs should be uploaded
-    logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
+    logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
         new ContainerId[] { container }, logFiles1, 3, true);
+    for(String logFile : logFiles1) {
+      Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+        .contains(logFile));
+    }
+    // Make sure the std_final is not uploaded.
+    Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+      .contains("std_final"));
 
     Thread.sleep(2000);
 
@@ -1380,15 +1400,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
 
     if (retentionSizeLimitation) {
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
-        50, 1, true, logFileInLastCycle));
+        50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
     } else {
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
         50, 2, false, null));
     }
     // Container logs should be uploaded
-    logFileInLastCycle = verifyContainerLogs(logAggregationService, application,
+    logFileStatusInLastCycle = verifyContainerLogs(logAggregationService, application,
         new ContainerId[] { container }, logFiles2, 3, true);
 
+    for(String logFile : logFiles2) {
+      Assert.assertTrue(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+        .contains(logFile));
+    }
+    // Make sure the std_final is not uploaded.
+    Assert.assertFalse(logFileStatusInLastCycle.getLogFileTypesInLastCycle()
+      .contains("std_final"));
+
     Thread.sleep(2000);
 
     // create another logs
@@ -1402,13 +1430,17 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
     logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
     if (retentionSizeLimitation) {
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
-        50, 1, true, logFileInLastCycle));
+        50, 1, true, logFileStatusInLastCycle.getLogFilePathInLastCycle()));
     } else {
       Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
         50, 3, false, null));
     }
+
+    // the app is finished. The log "std_final" should be aggregated this time.
+    String[] logFiles3WithFinalLog =
+        new String[] { "stdout_2", "stderr_2", "syslog_2", "std_final" };
     verifyContainerLogs(logAggregationService, application,
-      new ContainerId[] { container }, logFiles3, 3, true);
+      new ContainerId[] { container }, logFiles3WithFinalLog, 4, true);
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
   }
@@ -1512,4 +1544,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest
{
     return numOfLogsAvailable(logAggregationService, application, sizeLimited,
       lastLogFile) == expectNum;
   }
+
+  private static class LogFileStatusInLastCycle {
+    private String logFilePathInLastCycle;
+    private List<String> logFileTypesInLastCycle;
+
+    public LogFileStatusInLastCycle(String logFilePathInLastCycle,
+        List<String> logFileTypesInLastCycle) {
+      this.logFilePathInLastCycle = logFilePathInLastCycle;
+      this.logFileTypesInLastCycle = logFileTypesInLastCycle;
+    }
+
+    public String getLogFilePathInLastCycle() {
+      return this.logFilePathInLastCycle;
+    }
+
+    public List<String> getLogFileTypesInLastCycle() {
+      return this.logFileTypesInLastCycle;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/863079bb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 169517d..0ad2957 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -230,12 +230,18 @@ public class TestContainerAllocation {
     // create a not-null LogAggregationContext
     LogAggregationContext logAggregationContext =
         LogAggregationContext.newInstance(
-          "includePattern", "excludePattern");
+          "includePattern", "excludePattern",
+          "rolledLogsIncludePattern",
+          "rolledLogsExcludePattern");
     LogAggregationContext returned =
         getLogAggregationContextFromContainerToken(rm1, nm2,
           logAggregationContext);
     Assert.assertEquals("includePattern", returned.getIncludePattern());
     Assert.assertEquals("excludePattern", returned.getExcludePattern());
+    Assert.assertEquals("rolledLogsIncludePattern",
+      returned.getRolledLogsIncludePattern());
+    Assert.assertEquals("rolledLogsExcludePattern",
+      returned.getRolledLogsExcludePattern());
     rm1.stop();
   }
 


Mime
View raw message