hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [38/50] [abbrv] hadoop git commit: YARN-221. NM should provide a way for AM to tell it not to aggregate logs. Contributed by Ming Ma
Date Tue, 25 Aug 2015 22:22:32 GMT
YARN-221. NM should provide a way for AM to tell it not to aggregate
logs. Contributed by Ming Ma


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/37e1c3d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/37e1c3d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/37e1c3d8

Branch: refs/heads/HDFS-7240
Commit: 37e1c3d82a96d781e1c9982988b7de4aa5242d0c
Parents: 490bb5e
Author: Xuan <xgong@apache.org>
Authored: Sat Aug 22 16:25:24 2015 -0700
Committer: Xuan <xgong@apache.org>
Committed: Sat Aug 22 16:25:24 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/util/StringUtils.java     |  13 +-
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../yarn/api/records/LogAggregationContext.java |  95 +++
 .../hadoop/yarn/conf/YarnConfiguration.java     |   6 +
 .../api/ContainerLogAggregationPolicy.java      |  54 ++
 .../yarn/server/api/ContainerLogContext.java    |  71 ++
 .../src/main/proto/yarn_protos.proto            |   2 +
 .../impl/pb/LogAggregationContextPBImpl.java    |  40 ++
 .../ContainerLogsRetentionPolicy.java           |  15 +-
 .../src/main/resources/yarn-default.xml         |  24 +
 .../application/ApplicationImpl.java            |   5 +-
 .../AMOnlyLogAggregationPolicy.java             |  31 +
 ...AMOrFailedContainerLogAggregationPolicy.java |  35 +
 .../AbstractContainerLogAggregationPolicy.java  |  31 +
 .../logaggregation/AppLogAggregator.java        |   5 +-
 .../logaggregation/AppLogAggregatorImpl.java    | 131 ++--
 .../FailedContainerLogAggregationPolicy.java    |  33 +
 ...edOrKilledContainerLogAggregationPolicy.java |  30 +
 .../logaggregation/LogAggregationService.java   |  19 +-
 .../NoneContainerLogAggregationPolicy.java      |  30 +
 .../SampleContainerLogAggregationPolicy.java    | 124 ++++
 .../event/LogHandlerAppStartedEvent.java        |  15 +-
 .../containermanager/TestAuxServices.java       |   1 +
 .../TestLogAggregationService.java              | 677 ++++++++++++++++---
 .../TestNonAggregatingLogHandler.java           |  12 +-
 .../capacity/TestContainerAllocation.java       |  12 +-
 26 files changed, 1343 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
index 153270f..1107007 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java
@@ -315,7 +315,18 @@ public class StringUtils {
    * @return the arraylist of the comma seperated string values
    */
   public static String[] getStrings(String str){
-    Collection<String> values = getStringCollection(str);
+    String delim = ",";
+    return getStrings(str, delim);
+  }
+
+  /**
+   * Returns an arraylist of strings.
+   * @param str the string values
+   * @param delim delimiter to separate the values
+   * @return the arraylist of the seperated string values
+   */
+  public static String[] getStrings(String str, String delim){
+    Collection<String> values = getStringCollection(str, delim);
     if(values.size() == 0) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index cf7b67f..5904a31 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -178,6 +178,9 @@ Release 2.8.0 - UNRELEASED
     YARN-2923. Support configuration based NodeLabelsProvider Service in Distributed 
     Node Label Configuration Setup. (Naganarasimha G R)
 
+    YARN-221. NM should provide a way for AM to tell it not to aggregate logs.
+    (Ming Ma via xgong)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
index 9383004..5ac7d2d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/LogAggregationContext.java
@@ -54,6 +54,43 @@ import org.apache.hadoop.yarn.util.Records;
  *     name matches both the include and the exclude pattern, this file
  *     will be excluded eventually.
  *   </li>
+ *   <li>
+ *     policyClassName. The policy class name that implements
+ *     ContainerLogAggregationPolicy. At runtime, nodemanager will the policy
+ *     if a given container's log should be aggregated based on the
+ *     ContainerType and other runtime state such as exit code by calling
+ *     ContainerLogAggregationPolicy#shouldDoLogAggregation.
+ *     This is useful when the app only wants to aggregate logs of a subset of
+ *     containers. Here are the available policies. Please make sure to specify
+ *     the canonical name by prefixing org.apache.hadoop.yarn.server.
+ *     nodemanager.containermanager.logaggregation.
+ *     to the class simple name below.
+ *     NoneContainerLogAggregationPolicy: skip aggregation for all containers.
+ *     AllContainerLogAggregationPolicy: aggregate all containers.
+ *     AMOrFailedContainerLogAggregationPolicy: aggregate application master
+ *         or failed containers.
+ *     FailedOrKilledContainerLogAggregationPolicy: aggregate failed or killed
+ *         containers
+ *     FailedContainerLogAggregationPolicy: aggregate failed containers
+ *     AMOnlyLogAggregationPolicy: aggregate application master containers
+ *     SampleContainerLogAggregationPolicy: sample logs of successful worker
+ *         containers, in addition to application master and failed/killed
+ *         containers.
+ *     If it isn't specified, it will use the cluster-wide default policy
+ *     defined by configuration yarn.nodemanager.log-aggregation.policy.class.
+ *     The default value of yarn.nodemanager.log-aggregation.policy.class is
+ *     AllContainerLogAggregationPolicy.
+ *   </li>
+ *   <li>
+ *     policyParameters. The parameters passed to the policy class via
+ *     ContainerLogAggregationPolicy#parseParameters during the policy object
+ *     initialization. This is optional. Some policy class might use parameters
+ *     to adjust its settings. It is up to policy class to define the scheme of
+ *     parameters.
+ *     For example, SampleContainerLogAggregationPolicy supports the format of
+ *     "SR:0.5,MIN:50", which means sample rate of 50% beyond the first 50
+ *     successful worker containers.
+ *   </li>
  * </ul>
  *
  * @see ApplicationSubmissionContext
@@ -87,6 +124,23 @@ public abstract class LogAggregationContext {
     return context;
   }
 
+  @Public
+  @Unstable
+  public static LogAggregationContext newInstance(String includePattern,
+      String excludePattern, String rolledLogsIncludePattern,
+      String rolledLogsExcludePattern, String policyClassName,
+      String policyParameters) {
+    LogAggregationContext context =
+        Records.newRecord(LogAggregationContext.class);
+    context.setIncludePattern(includePattern);
+    context.setExcludePattern(excludePattern);
+    context.setRolledLogsIncludePattern(rolledLogsIncludePattern);
+    context.setRolledLogsExcludePattern(rolledLogsExcludePattern);
+    context.setLogAggregationPolicyClassName(policyClassName);
+    context.setLogAggregationPolicyParameters(policyParameters);
+    return context;
+  }
+
   /**
    * Get include pattern. This includePattern only takes affect
    * on logs that exist at the time of application finish.
@@ -164,4 +218,45 @@ public abstract class LogAggregationContext {
   @Unstable
   public abstract void setRolledLogsExcludePattern(
       String rolledLogsExcludePattern);
+
+  /**
+   * Get the log aggregation policy class.
+   *
+   * @return log aggregation policy class
+   */
+  @Public
+  @Unstable
+  public abstract String getLogAggregationPolicyClassName();
+
+  /**
+   * Set the log aggregation policy class.
+   *
+   * @param className
+   */
+  @Public
+  @Unstable
+  public abstract void setLogAggregationPolicyClassName(
+      String className);
+
+  /**
+   * Get the log aggregation policy parameters.
+   *
+   * @return log aggregation policy parameters
+   */
+  @Public
+  @Unstable
+  public abstract String getLogAggregationPolicyParameters();
+
+  /**
+   * Set the log aggregation policy parameters.
+   * There is no schema defined for the parameters string.
+   * It is up to the log aggregation policy class to decide how to parse
+   * the parameters string.
+   *
+   * @param parameters
+   */
+  @Public
+  @Unstable
+  public abstract void setLogAggregationPolicyParameters(
+      String parameters);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 55eac85..a18ef7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -1237,6 +1237,12 @@ public class YarnConfiguration extends Configuration {
       NM_RECOVERY_PREFIX + "supervised";
   public static final boolean DEFAULT_NM_RECOVERY_SUPERVISED = false;
 
+  public static final String NM_LOG_AGG_POLICY_CLASS =
+      NM_PREFIX + "log-aggregation.policy.class";
+
+  public static final String NM_LOG_AGG_POLICY_CLASS_PARAMETERS = NM_PREFIX
+      + "log-aggregation.policy.parameters";
+
   ////////////////////////////////
   // Web Proxy Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java
new file mode 100644
index 0000000..2acbcf2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogAggregationPolicy.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+
+/**
+ * This API is used by NodeManager to decide if a given container's logs
+ * should be aggregated at run time.
+ */
+@Public
+@Unstable
+public interface ContainerLogAggregationPolicy {
+
+  /**
+   * <p>
+   * The method used by the NodeManager log aggregation service
+   * to initial the policy object with parameters specified by the application
+   * or the cluster-wide setting.
+   * </p>
+   *
+   * @param parameters parameters with scheme defined by the policy class.
+   */
+  void parseParameters(String parameters);
+
+  /**
+   * <p>
+   * The method used by the NodeManager log aggregation service
+   * to ask the policy object if a given container's logs should be aggregated.
+   * </p>
+   *
+   * @param logContext ContainerLogContext
+   * @return Whether or not the container's logs should be aggregated.
+   */
+  boolean shouldDoLogAggregation(ContainerLogContext logContext);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java
new file mode 100644
index 0000000..ab3b75c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ContainerLogContext.java
@@ -0,0 +1,71 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * Context class for {@link ContainerLogAggregationPolicy}.
+ */
+@Public
+@Unstable
+public class ContainerLogContext {
+  private final ContainerId containerId;
+  private final ContainerType containerType;
+  private int exitCode;
+
+  @Public
+  @Unstable
+  public ContainerLogContext(ContainerId containerId,
+      ContainerType containerType, int exitCode) {
+    this.containerId = containerId;
+    this.containerType = containerType;
+    this.exitCode = exitCode;
+  }
+
+  /**
+   * Get {@link ContainerId} of the container.
+   *
+   * @return the container ID
+   */
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  /**
+   * Get {@link ContainerType} the type of the container.
+   *
+   * @return the type of the container
+   */
+  public ContainerType getContainerType() {
+    return containerType;
+  }
+
+  /**
+   * Get the exit code of the container.
+   *
+   * @return the exit code
+   */
+  public int getExitCode() {
+    return exitCode;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 13d8365..1bd3dda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -343,6 +343,8 @@ message LogAggregationContextProto {
  optional string exclude_pattern = 2 [default = ""];
  optional string rolled_logs_include_pattern = 3 [default = ""];
  optional string rolled_logs_exclude_pattern = 4 [default = ".*"];
+ optional string log_aggregation_policy_class_name = 5;
+ optional string log_aggregation_policy_parameters = 6;
 }
 
 enum ApplicationAccessTypeProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
index f6409bb..14a50fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/LogAggregationContextPBImpl.java
@@ -155,4 +155,44 @@ public class LogAggregationContextPBImpl extends LogAggregationContext{
     }
     builder.setRolledLogsExcludePattern(rolledLogsExcludePattern);
   }
+
+  @Override
+  public String getLogAggregationPolicyClassName() {
+    LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (! p.hasLogAggregationPolicyClassName()) {
+      return null;
+    }
+    return p.getLogAggregationPolicyClassName();
+  }
+
+  @Override
+  public void setLogAggregationPolicyClassName(
+      String className) {
+    maybeInitBuilder();
+    if (className == null) {
+      builder.clearLogAggregationPolicyClassName();
+      return;
+    }
+    builder.setLogAggregationPolicyClassName(className);
+  }
+
+  @Override
+  public String getLogAggregationPolicyParameters() {
+    LogAggregationContextProtoOrBuilder p = viaProto ? proto : builder;
+    if (! p.hasLogAggregationPolicyParameters()) {
+      return null;
+    }
+    return p.getLogAggregationPolicyParameters();
+  }
+
+  @Override
+  public void setLogAggregationPolicyParameters(
+      String config) {
+    maybeInitBuilder();
+    if (config == null) {
+      builder.clearLogAggregationPolicyParameters();
+      return;
+    }
+    builder.setLogAggregationPolicyParameters(config);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java
index fa39f25..3e7cd5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ContainerLogsRetentionPolicy.java
@@ -16,14 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.logaggregation;
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 
 @Private
-/**
- * This API is not exposed to end-users yet.
- */
-public enum ContainerLogsRetentionPolicy {
-  APPLICATION_MASTER_ONLY, AM_AND_FAILED_CONTAINERS_ONLY, ALL_CONTAINERS 
-}
+public class AllContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 00a9fba..62ba599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2226,4 +2226,28 @@
     <value>0</value>
   </property>
 
+  <property>
+    <description>
+    The default log aggregation policy class. Applications can
+    override it via LogAggregationContext. This configuration can provide
+    some cluster-side default behavior so that if the application doesn't
+    specify any policy via LogAggregationContext administrators of the cluster
+    can adjust the policy globally.
+    </description>
+    <name>yarn.nodemanager.log-aggregation.policy.class</name>
+    <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AllContainerLogAggregationPolicy</value>
+  </property>
+
+  <property>
+    <description>
+    The default parameters for the log aggregation policy. Applications can
+    override it via LogAggregationContext. This configuration can provide
+    some cluster-side default behavior so that if the application doesn't
+    specify any policy via LogAggregationContext administrators of the cluster
+    can adjust the policy globally.
+    </description>
+    <name>yarn.nodemanager.log-aggregation.policy.parameters</name>
+    <value></value>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index e880c31..fbc8453 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
@@ -242,8 +241,8 @@ public class ApplicationImpl implements Application {
       app.logAggregationContext = initEvent.getLogAggregationContext();
       app.dispatcher.getEventHandler().handle(
           new LogHandlerAppStartedEvent(app.appId, app.user,
-              app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
-              app.applicationACLs, app.logAggregationContext)); 
+              app.credentials, app.applicationACLs,
+              app.logAggregationContext));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java
new file mode 100644
index 0000000..1c740ee
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOnlyLogAggregationPolicy.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+
+@Private
+public class AMOnlyLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+   return logContext.getContainerType() == ContainerType.APPLICATION_MASTER;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java
new file mode 100644
index 0000000..faee004
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AMOrFailedContainerLogAggregationPolicy.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+
+@Private
+public class AMOrFailedContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    int exitCode = logContext.getExitCode();
+    return logContext.getContainerType() == ContainerType.APPLICATION_MASTER ||
+        (exitCode != 0 && exitCode != ExitCode.FORCE_KILLED.getExitCode()
+        && exitCode != ExitCode.TERMINATED.getExitCode());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java
new file mode 100644
index 0000000..8d9dc03
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AbstractContainerLogAggregationPolicy.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
+
+// The class provides no-op implementation for parseParameters. Polices
+// that don't need parameters can derive from this class.
+@Private
+public abstract class AbstractContainerLogAggregationPolicy implements
+    ContainerLogAggregationPolicy {
+  public void parseParameters(String parameters) {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
index 0b72a39..83c5d5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
@@ -18,12 +18,11 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
-import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 
 public interface AppLogAggregator extends Runnable {
 
-  void startContainerLogAggregation(ContainerId containerId,
-      boolean wasContainerSuccessful);
+  void startContainerLogAggregation(ContainerLogContext logContext);
 
   void abortLogAggregation();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 654eb0b..742b8a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -56,9 +57,12 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -107,7 +111,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final UserGroupInformation userUgi;
   private final Path remoteNodeLogFileForApp;
   private final Path remoteNodeTmpLogFileForApp;
-  private final ContainerLogsRetentionPolicy retentionPolicy;
 
   private final BlockingQueue<ContainerId> pendingContainers;
   private final AtomicBoolean appFinishing = new AtomicBoolean();
@@ -128,12 +131,12 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
 
   private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators =
       new HashMap<ContainerId, ContainerLogAggregator>();
+  private final ContainerLogAggregationPolicy logAggPolicy;
 
   public AppLogAggregatorImpl(Dispatcher dispatcher,
       DeletionService deletionService, Configuration conf,
       ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
       LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
-      ContainerLogsRetentionPolicy retentionPolicy,
       Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext, Context context,
       FileContext lfs) {
@@ -146,7 +149,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.dirsHandler = dirsHandler;
     this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
     this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
-    this.retentionPolicy = retentionPolicy;
     this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
     this.appAcls = appAcls;
     this.lfs = lfs;
@@ -204,6 +206,66 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
             || this.logAggregationContext.getRolledLogsIncludePattern() == null
             || this.logAggregationContext.getRolledLogsIncludePattern()
               .isEmpty() ? false : true;
+    this.logAggPolicy = getLogAggPolicy(conf);
+  }
+
+  private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
+    ContainerLogAggregationPolicy policy = getLogAggPolicyInstance(conf);
+    String params = getLogAggPolicyParameters(conf);
+    if (params != null) {
+      policy.parseParameters(params);
+    }
+    return policy;
+  }
+
+  // Use the policy class specified in LogAggregationContext if available.
+  // Otherwise use the cluster-wide default policy class.
+  private ContainerLogAggregationPolicy getLogAggPolicyInstance(
+      Configuration conf) {
+    Class<? extends ContainerLogAggregationPolicy> policyClass = null;
+    if (this.logAggregationContext != null) {
+      String className =
+          this.logAggregationContext.getLogAggregationPolicyClassName();
+      if (className != null) {
+        try {
+          Class<?> policyFromContext = conf.getClassByName(className);
+          if (ContainerLogAggregationPolicy.class.isAssignableFrom(
+              policyFromContext)) {
+            policyClass = policyFromContext.asSubclass(
+                ContainerLogAggregationPolicy.class);
+          } else {
+            LOG.warn(this.appId + " specified invalid log aggregation policy " +
+                className);
+          }
+        } catch (ClassNotFoundException cnfe) {
+          // We don't fail the app if the policy class isn't valid.
+          LOG.warn(this.appId + " specified invalid log aggregation policy " +
+              className);
+        }
+      }
+    }
+    if (policyClass == null) {
+      policyClass = conf.getClass(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS,
+          AllContainerLogAggregationPolicy.class,
+              ContainerLogAggregationPolicy.class);
+    } else {
+      LOG.info(this.appId + " specifies ContainerLogAggregationPolicy of "
+          + policyClass);
+    }
+    return ReflectionUtils.newInstance(policyClass, conf);
+  }
+
+  // Use the policy parameters specified in LogAggregationContext if available.
+  // Otherwise use the cluster-wide default policy parameters.
+  private String getLogAggPolicyParameters(Configuration conf) {
+    String params = null;
+    if (this.logAggregationContext != null) {
+      params = this.logAggregationContext.getLogAggregationPolicyParameters();
+    }
+    if (params == null) {
+      params = conf.get(YarnConfiguration.NM_LOG_AGG_POLICY_CLASS_PARAMETERS);
+    }
+    return params;
   }
 
   private void uploadLogsForContainers(boolean appFinished) {
@@ -228,21 +290,22 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     // Create a set of Containers whose logs will be uploaded in this cycle.
     // It includes:
     // a) all containers in pendingContainers: those containers are finished
-    //    and satisfy the retentionPolicy.
+    //    and satisfy the ContainerLogAggregationPolicy.
     // b) some set of running containers: For all the Running containers,
-    // we have ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY,
-    // so simply set wasContainerSuccessful as true to
-    // bypass FAILED_CONTAINERS check and find the running containers 
-    // which satisfy the retentionPolicy.
+    //    we use exitCode of 0 to find those which satisfy the
+    //    ContainerLogAggregationPolicy.
     Set<ContainerId> pendingContainerInThisCycle = new HashSet<ContainerId>();
     this.pendingContainers.drainTo(pendingContainerInThisCycle);
     Set<ContainerId> finishedContainers =
         new HashSet<ContainerId>(pendingContainerInThisCycle);
     if (this.context.getApplications().get(this.appId) != null) {
-      for (ContainerId container : this.context.getApplications()
-        .get(this.appId).getContainers().keySet()) {
-        if (shouldUploadLogs(container, true)) {
-          pendingContainerInThisCycle.add(container);
+      for (Container container : this.context.getApplications()
+        .get(this.appId).getContainers().values()) {
+        ContainerType containerType =
+            container.getContainerTokenIdentifier().getContainerType();
+        if (shouldUploadLogs(new ContainerLogContext(
+            container.getContainerId(), containerType, 0))) {
+          pendingContainerInThisCycle.add(container.getContainerId());
         }
       }
     }
@@ -506,46 +569,16 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
 
   // TODO: The condition: containerId.getId() == 1 to determine an AM container
   // is not always true.
-  private boolean shouldUploadLogs(ContainerId containerId,
-      boolean wasContainerSuccessful) {
-
-    // All containers
-    if (this.retentionPolicy
-        .equals(ContainerLogsRetentionPolicy.ALL_CONTAINERS)) {
-      return true;
-    }
-
-    // AM Container only
-    if (this.retentionPolicy
-        .equals(ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY)) {
-      if ((containerId.getContainerId()
-          & ContainerId.CONTAINER_ID_BITMASK)== 1) {
-        return true;
-      }
-      return false;
-    }
-
-    // AM + Failing containers
-    if (this.retentionPolicy
-        .equals(ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY)) {
-      if ((containerId.getContainerId()
-          & ContainerId.CONTAINER_ID_BITMASK) == 1) {
-        return true;
-      } else if(!wasContainerSuccessful) {
-        return true;
-      }
-      return false;
-    }
-    return false;
+  private boolean shouldUploadLogs(ContainerLogContext logContext) {
+    return logAggPolicy.shouldDoLogAggregation(logContext);
   }
 
   @Override
-  public void startContainerLogAggregation(ContainerId containerId,
-      boolean wasContainerSuccessful) {
-    if (shouldUploadLogs(containerId, wasContainerSuccessful)) {
-      LOG.info("Considering container " + containerId
+  public void startContainerLogAggregation(ContainerLogContext logContext) {
+    if (shouldUploadLogs(logContext)) {
+      LOG.info("Considering container " + logContext.getContainerId()
           + " for log-aggregation");
-      this.pendingContainers.add(containerId);
+      this.pendingContainers.add(logContext.getContainerId());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java
new file mode 100644
index 0000000..800315e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedContainerLogAggregationPolicy.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+
+@Private
+public class FailedContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    int exitCode = logContext.getExitCode();
+    return exitCode != 0 && exitCode != ExitCode.FORCE_KILLED.getExitCode()
+        && exitCode != ExitCode.TERMINATED.getExitCode();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java
new file mode 100644
index 0000000..02a48ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/FailedOrKilledContainerLogAggregationPolicy.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+
+@Private
+public class FailedOrKilledContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    return logContext.getExitCode() != 0;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index dbbfcd5..259e9ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -48,8 +48,9 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -314,13 +315,12 @@ public class LogAggregationService extends AbstractService implements
 
   @SuppressWarnings("unchecked")
   private void initApp(final ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
-      Map<ApplicationAccessType, String> appAcls,
+      Credentials credentials, Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
     ApplicationEvent eventResponse;
     try {
       verifyAndCreateRemoteLogDir(getConfig());
-      initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls,
+      initAppAggregator(appId, user, credentials, appAcls,
           logAggregationContext);
       eventResponse = new ApplicationEvent(appId,
           ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
@@ -342,8 +342,7 @@ public class LogAggregationService extends AbstractService implements
 
 
   protected void initAppAggregator(final ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
-      Map<ApplicationAccessType, String> appAcls,
+      Credentials credentials, Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
 
     // Get user's FileSystem credentials
@@ -357,7 +356,7 @@ public class LogAggregationService extends AbstractService implements
     final AppLogAggregator appLogAggregator =
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
             getConfig(), appId, userUgi, this.nodeId, dirsHandler,
-            getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
+            getRemoteNodeLogFileForApp(appId, user),
             appAcls, logAggregationContext, this.context,
             getLocalFileContext(getConfig()));
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
@@ -420,7 +419,10 @@ public class LogAggregationService extends AbstractService implements
           + ", did it fail to start?");
       return;
     }
-    aggregator.startContainerLogAggregation(containerId, exitCode == 0);
+    ContainerType containerType = context.getContainers().get(
+        containerId).getContainerTokenIdentifier().getContainerType();
+    aggregator.startContainerLogAggregation(
+        new ContainerLogContext(containerId, containerType, exitCode));
   }
 
   private void stopApp(ApplicationId appId) {
@@ -445,7 +447,6 @@ public class LogAggregationService extends AbstractService implements
             (LogHandlerAppStartedEvent) event;
         initApp(appStartEvent.getApplicationId(), appStartEvent.getUser(),
             appStartEvent.getCredentials(),
-            appStartEvent.getLogRetentionPolicy(),
             appStartEvent.getApplicationAcls(),
             appStartEvent.getLogAggregationContext());
         break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java
new file mode 100644
index 0000000..86dcd54
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/NoneContainerLogAggregationPolicy.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+
+@Private
+public class NoneContainerLogAggregationPolicy extends
+    AbstractContainerLogAggregationPolicy {
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java
new file mode 100644
index 0000000..56c760b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/SampleContainerLogAggregationPolicy.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
+
+import java.util.Collection;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
+import org.apache.hadoop.yarn.server.api.ContainerLogContext;
+import org.apache.hadoop.yarn.server.api.ContainerType;
+
+/**
+ * The sample policy samples logs of successful worker containers to aggregate.
+ * It always aggregates AM container and failed/killed worker
+ * containers' logs. To make sure small applications have enough logs, it only
+ * applies sampling beyond minimal number of containers. The parameters can be
+ * configured by SAMPLE_RATE and MIN_THRESHOLD. For example if SAMPLE_RATE is
+ * 0.2 and MIN_THRESHOLD is 20, for an application with 100 successful
+ * worker containers, 20 + (100-20) * 0.2 = 36 containers's logs will be
+ * aggregated.
+ */
+@Private
+public class SampleContainerLogAggregationPolicy implements
+    ContainerLogAggregationPolicy  {
+  private static final Log LOG =
+      LogFactory.getLog(SampleContainerLogAggregationPolicy.class);
+
+  static String SAMPLE_RATE = "SR";
+  public static final float DEFAULT_SAMPLE_RATE = 0.2f;
+
+  static String MIN_THRESHOLD = "MIN";
+  public static final int DEFAULT_SAMPLE_MIN_THRESHOLD = 20;
+
+  private float sampleRate = DEFAULT_SAMPLE_RATE;
+  private int minThreshold = DEFAULT_SAMPLE_MIN_THRESHOLD;
+
+  static public String buildParameters(float sampleRate, int minThreshold) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(SAMPLE_RATE).append(":").append(sampleRate).append(",").
+        append(MIN_THRESHOLD).append(":").append(minThreshold);
+    return sb.toString();
+  }
+
+  // Parameters are comma separated properties, for example
+  // "SR:0.5,MIN:50"
+  public void parseParameters(String parameters) {
+    Collection<String> params = StringUtils.getStringCollection(parameters);
+    for(String param : params) {
+      // The first element is the property name.
+      // The second element is the property value.
+      String[] property = StringUtils.getStrings(param, ":");
+      if (property == null || property.length != 2) {
+        continue;
+      }
+      if (property[0].equals(SAMPLE_RATE)) {
+        try {
+          float sampleRate = Float.parseFloat(property[1]);
+          if (sampleRate >= 0.0 && sampleRate <= 1.0) {
+            this.sampleRate = sampleRate;
+          } else {
+            LOG.warn("The format isn't valid. Sample rate falls back to the " +
+                "default value " + DEFAULT_SAMPLE_RATE);
+          }
+        } catch (NumberFormatException nfe) {
+          LOG.warn("The format isn't valid. Sample rate falls back to the " +
+              "default value " + DEFAULT_SAMPLE_RATE);
+        }
+      } else if (property[0].equals(MIN_THRESHOLD)) {
+        try {
+          int minThreshold = Integer.parseInt(property[1]);
+          if (minThreshold >= 0) {
+            this.minThreshold = minThreshold;
+          } else {
+            LOG.warn("The format isn't valid. Min threshold falls back to " +
+                "the default value " + DEFAULT_SAMPLE_MIN_THRESHOLD);
+          }
+        } catch (NumberFormatException nfe) {
+          LOG.warn("The format isn't valid. Min threshold falls back to the " +
+              "default value " + DEFAULT_SAMPLE_MIN_THRESHOLD);
+        }
+      }
+    }
+  }
+
+  public boolean shouldDoLogAggregation(ContainerLogContext logContext) {
+    if (logContext.getContainerType() ==
+        ContainerType.APPLICATION_MASTER || logContext.getExitCode() != 0) {
+      // If it is AM or failed or killed container, enable log aggregation.
+      return true;
+    }
+
+    // Only sample log aggregation for large applications.
+    // We assume the container id is continuously allocated from number 1 and
+    // Worker containers start from id 2. So logs of worker containers with ids
+    // in [2, minThreshold + 1] will be aggregated.
+    if ((logContext.getContainerId().getContainerId() &
+        ContainerId.CONTAINER_ID_BITMASK) < minThreshold + 2) {
+      return true;
+    }
+
+    // Sample log aggregation for the rest of successful worker containers
+    return (sampleRate != 0 &&
+        logContext.getContainerId().hashCode() % (1/sampleRate) == 0);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
index 993f69c..d3ff771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerAppStartedEvent.java
@@ -24,32 +24,27 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
-import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 
 public class LogHandlerAppStartedEvent extends LogHandlerEvent {
 
   private final ApplicationId applicationId;
-  private final ContainerLogsRetentionPolicy retentionPolicy;
   private final String user;
   private final Credentials credentials;
   private final Map<ApplicationAccessType, String> appAcls;
   private final LogAggregationContext logAggregationContext;
 
   public LogHandlerAppStartedEvent(ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
-      Map<ApplicationAccessType, String> appAcls) {
-    this(appId, user, credentials, retentionPolicy, appAcls, null);
+      Credentials credentials, Map<ApplicationAccessType, String> appAcls) {
+    this(appId, user, credentials, appAcls, null);
   }
 
   public LogHandlerAppStartedEvent(ApplicationId appId, String user,
-      Credentials credentials, ContainerLogsRetentionPolicy retentionPolicy,
-      Map<ApplicationAccessType, String> appAcls,
+      Credentials credentials, Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
     super(LogHandlerEventType.APPLICATION_STARTED);
     this.applicationId = appId;
     this.user = user;
     this.credentials = credentials;
-    this.retentionPolicy = retentionPolicy;
     this.appAcls = appAcls;
     this.logAggregationContext = logAggregationContext;
   }
@@ -62,10 +57,6 @@ public class LogHandlerAppStartedEvent extends LogHandlerEvent {
     return this.credentials;
   }
 
-  public ContainerLogsRetentionPolicy getLogRetentionPolicy() {
-    return this.retentionPolicy;
-  }
-
   public String getUser() {
     return this.user;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/37e1c3d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
index 757cdc8..1380752 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestAuxServices.java
@@ -87,6 +87,7 @@ public class TestAuxServices {
       this.stoppedApps = new ArrayList<Integer>();
     }
 
+    @SuppressWarnings("unchecked")
     public ArrayList<Integer> getAppIdsStopped() {
       return (ArrayList<Integer>)this.stoppedApps.clone();
     }


Mime
View raw message