hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject [11/50] [abbrv] hadoop git commit: MAPREDUCE-6415. Create a tool to combine aggregated logs into HAR files. (Robert Kanter via kasha)
Date Tue, 15 Sep 2015 19:11:56 GMT
MAPREDUCE-6415. Create a tool to combine aggregated logs into HAR files. (Robert Kanter via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/119cc75e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/119cc75e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/119cc75e

Branch: refs/heads/HADOOP-11890
Commit: 119cc75e7ebd723790f6326498383304aba384a2
Parents: 4014ce5
Author: Karthik Kambatla <kasha@apache.org>
Authored: Wed Sep 9 17:41:27 2015 -0700
Committer: Karthik Kambatla <kasha@apache.org>
Committed: Wed Sep 9 17:45:19 2015 -0700

----------------------------------------------------------------------
 MAPREDUCE-6415.003.patch                        | 1308 ++++++++++++++++++
 .../main/resources/assemblies/hadoop-tools.xml  |    7 +
 hadoop-mapreduce-project/CHANGES.txt            |    3 +
 hadoop-mapreduce-project/bin/mapred             |    8 +
 hadoop-project/pom.xml                          |    5 +
 hadoop-tools/hadoop-archive-logs/pom.xml        |  171 +++
 .../apache/hadoop/tools/HadoopArchiveLogs.java  |  403 ++++++
 .../hadoop/tools/HadoopArchiveLogsRunner.java   |  180 +++
 .../hadoop/tools/TestHadoopArchiveLogs.java     |  293 ++++
 .../tools/TestHadoopArchiveLogsRunner.java      |  143 ++
 hadoop-tools/hadoop-tools-dist/pom.xml          |    5 +
 hadoop-tools/pom.xml                            |    1 +
 12 files changed, 2527 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/MAPREDUCE-6415.003.patch
----------------------------------------------------------------------
diff --git a/MAPREDUCE-6415.003.patch b/MAPREDUCE-6415.003.patch
new file mode 100644
index 0000000..7c14341
--- /dev/null
+++ b/MAPREDUCE-6415.003.patch
@@ -0,0 +1,1308 @@
+diff --git hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+index fa55703..3f646e6 100644
+--- hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
++++ hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+@@ -52,6 +52,13 @@
+       </includes>
+     </fileSet>
+     <fileSet>
++      <directory>../hadoop-archive-logs/target</directory>
++      <outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
++      <includes>
++        <include>*-sources.jar</include>
++      </includes>
++    </fileSet>
++    <fileSet>
+       <directory>../hadoop-datajoin/target</directory>
+       <outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
+       <includes>
+diff --git hadoop-mapreduce-project/bin/mapred hadoop-mapreduce-project/bin/mapred
+index 426af80..2d56a8d 100755
+--- hadoop-mapreduce-project/bin/mapred
++++ hadoop-mapreduce-project/bin/mapred
+@@ -20,6 +20,7 @@ MYNAME="${BASH_SOURCE-$0}"
+ function hadoop_usage
+ {
+   hadoop_add_subcommand "archive" "create a hadoop archive"
++  hadoop_add_subcommand "archive-logs" "combine aggregated logs into hadoop archives"
+   hadoop_add_subcommand "classpath" "prints the class path needed for running mapreduce subcommands"
+   hadoop_add_subcommand "distcp" "copy file or directories recursively"
+   hadoop_add_subcommand "historyserver" "run job history servers as a standalone daemon"
+@@ -72,6 +73,13 @@ case ${COMMAND} in
+     hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
+     HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
+   ;;
++  archive-logs)
++    CLASS=org.apache.hadoop.tools.HadoopArchiveLogs
++    hadoop_debug "Injecting TOOL_PATH into CLASSPATH"
++    hadoop_add_classpath "${TOOL_PATH}"
++    hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
++    HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
++  ;;
+   classpath)
+     hadoop_do_classpath_subcommand CLASS "$@"
+   ;;
+diff --git hadoop-project/pom.xml hadoop-project/pom.xml
+index 9863475..636e063 100644
+--- hadoop-project/pom.xml
++++ hadoop-project/pom.xml
+@@ -324,6 +324,11 @@
+       </dependency>
+       <dependency>
+         <groupId>org.apache.hadoop</groupId>
++        <artifactId>hadoop-archive-logs</artifactId>
++        <version>${project.version}</version>
++      </dependency>
++      <dependency>
++        <groupId>org.apache.hadoop</groupId>
+         <artifactId>hadoop-distcp</artifactId>
+         <version>${project.version}</version>
+       </dependency>
+diff --git hadoop-tools/hadoop-archive-logs/pom.xml hadoop-tools/hadoop-archive-logs/pom.xml
+new file mode 100644
+index 0000000..2a480a8
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/pom.xml
+@@ -0,0 +1,171 @@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++  Licensed under the Apache License, Version 2.0 (the "License");
++  you may not use this file except in compliance with the License.
++  You may obtain a copy of the License at
++
++    http://www.apache.org/licenses/LICENSE-2.0
++
++  Unless required by applicable law or agreed to in writing, software
++  distributed under the License is distributed on an "AS IS" BASIS,
++  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++  See the License for the specific language governing permissions and
++  limitations under the License. See accompanying LICENSE file.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0"
++  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
++  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
++                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
++  <modelVersion>4.0.0</modelVersion>
++  <parent>
++    <groupId>org.apache.hadoop</groupId>
++    <artifactId>hadoop-project</artifactId>
++    <version>3.0.0-SNAPSHOT</version>
++    <relativePath>../../hadoop-project</relativePath>
++  </parent>
++  <groupId>org.apache.hadoop</groupId>
++  <artifactId>hadoop-archive-logs</artifactId>
++  <version>3.0.0-SNAPSHOT</version>
++  <description>Apache Hadoop Archive Logs</description>
++  <name>Apache Hadoop Archive Logs</name>
++  <packaging>jar</packaging>
++
++  <properties>
++    <hadoop.log.dir>${project.build.directory}/log</hadoop.log.dir>
++  </properties>
++
++  <dependencies>
++    <dependency>
++      <groupId>junit</groupId>
++      <artifactId>junit</artifactId>
++      <scope>test</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-mapreduce-client-core</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-yarn-applications-distributedshell</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-common</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-hdfs</artifactId>
++      <scope>test</scope>
++      <type>test-jar</type>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-yarn-server-tests</artifactId>
++      <type>test-jar</type>
++      <scope>test</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-archives</artifactId>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-yarn-common</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-yarn-api</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>com.google.guava</groupId>
++      <artifactId>guava</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>commons-io</groupId>
++      <artifactId>commons-io</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>commons-logging</groupId>
++      <artifactId>commons-logging</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>commons-cli</groupId>
++      <artifactId>commons-cli</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-yarn-client</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
++      <scope>provided</scope>
++    </dependency>
++    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-hdfs</artifactId>
++      <scope>test</scope>
++    </dependency>
++    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-common</artifactId>
++      <scope>test</scope>
++      <type>test-jar</type>
++    </dependency>
++    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
++      <scope>test</scope>
++      <type>test-jar</type>
++    </dependency>
++  </dependencies>
++
++  <build>
++    <plugins>
++      <plugin>
++        <groupId>org.apache.maven.plugins</groupId>
++        <artifactId>maven-antrun-plugin</artifactId>
++        <executions>
++          <execution>
++            <id>create-log-dir</id>
++            <phase>process-test-resources</phase>
++            <goals>
++              <goal>run</goal>
++            </goals>
++            <configuration>
++              <target>
++                <delete dir="${test.build.data}"/>
++                <mkdir dir="${test.build.data}"/>
++                <mkdir dir="${hadoop.log.dir}"/>
++              </target>
++            </configuration>
++          </execution>
++        </executions>
++      </plugin>
++      <plugin>
++        <groupId>org.apache.maven.plugins</groupId>
++        <artifactId>maven-jar-plugin</artifactId>
++         <configuration>
++          <archive>
++           <manifest>
++            <mainClass>org.apache.hadoop.tools.HadoopArchiveLogs</mainClass>
++           </manifest>
++         </archive>
++        </configuration>
++       </plugin>
++    </plugins>
++  </build>
++</project>
+diff --git hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
+new file mode 100644
+index 0000000..4778dcb
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
+@@ -0,0 +1,403 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.tools;
++
++import com.google.common.annotations.VisibleForTesting;
++import org.apache.commons.cli.CommandLine;
++import org.apache.commons.cli.CommandLineParser;
++import org.apache.commons.cli.GnuParser;
++import org.apache.commons.cli.HelpFormatter;
++import org.apache.commons.cli.Option;
++import org.apache.commons.cli.Options;
++import org.apache.commons.cli.ParseException;
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.permission.FsAction;
++import org.apache.hadoop.fs.permission.FsPermission;
++import org.apache.hadoop.mapred.JobConf;
++import org.apache.hadoop.util.Tool;
++import org.apache.hadoop.util.ToolRunner;
++import org.apache.hadoop.yarn.api.records.ApplicationReport;
++import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
++import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
++import org.apache.hadoop.yarn.applications.distributedshell.Client;
++import org.apache.hadoop.yarn.client.api.YarnClient;
++import org.apache.hadoop.yarn.conf.YarnConfiguration;
++import org.apache.hadoop.yarn.exceptions.YarnException;
++import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
++
++import java.io.File;
++import java.io.FileWriter;
++import java.io.IOException;
++import java.util.ArrayList;
++import java.util.Collections;
++import java.util.Comparator;
++import java.util.HashSet;
++import java.util.Iterator;
++import java.util.List;
++import java.util.Set;
++
++
++/**
++ * This tool moves Aggregated Log files into HAR archives using the
++ * {@link HadoopArchives} tool and the Distributed Shell via the
++ * {@link HadoopArchiveLogsRunner}.
++ */
++public class HadoopArchiveLogs implements Tool {
++  private static final Log LOG = LogFactory.getLog(HadoopArchiveLogs.class);
++
++  private static final String HELP_OPTION = "help";
++  private static final String MAX_ELIGIBLE_APPS_OPTION = "maxEligibleApps";
++  private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles";
++  private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize";
++  private static final String MEMORY_OPTION = "memory";
++
++  private static final int DEFAULT_MAX_ELIGIBLE = -1;
++  private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
++  private static final long DEFAULT_MAX_TOTAL_LOGS_SIZE = 1024L;
++  private static final long DEFAULT_MEMORY = 1024L;
++
++  @VisibleForTesting
++  int maxEligible = DEFAULT_MAX_ELIGIBLE;
++  @VisibleForTesting
++  int minNumLogFiles = DEFAULT_MIN_NUM_LOG_FILES;
++  @VisibleForTesting
++  long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L;
++  @VisibleForTesting
++  long memory = DEFAULT_MEMORY;
++
++  @VisibleForTesting
++  Set<ApplicationReport> eligibleApplications;
++
++  private JobConf conf;
++
++  public HadoopArchiveLogs(Configuration conf) {
++    setConf(conf);
++    eligibleApplications = new HashSet<>();
++  }
++
++  public static void main(String[] args) {
++    JobConf job = new JobConf(HadoopArchiveLogs.class);
++
++    HadoopArchiveLogs hal = new HadoopArchiveLogs(job);
++    int ret = 0;
++
++    try{
++      ret = ToolRunner.run(hal, args);
++    } catch(Exception e) {
++      LOG.debug("Exception", e);
++      System.err.println(e.getClass().getSimpleName());
++      final String s = e.getLocalizedMessage();
++      if (s != null) {
++        System.err.println(s);
++      } else {
++        e.printStackTrace(System.err);
++      }
++      System.exit(1);
++    }
++    System.exit(ret);
++  }
++
++  @Override
++  public int run(String[] args) throws Exception {
++    handleOpts(args);
++
++    findAggregatedApps();
++
++    FileSystem fs = null;
++    Path remoteRootLogDir = new Path(conf.get(
++        YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
++        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
++    String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
++    Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
++    try {
++      fs = FileSystem.get(conf);
++      checkFiles(fs, remoteRootLogDir, suffix);
++
++      // Prepare working directory
++      if (fs.exists(workingDir)) {
++        fs.delete(workingDir, true);
++      }
++      fs.mkdirs(workingDir);
++      fs.setPermission(workingDir,
++          new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
++    } finally {
++      if (fs != null) {
++        fs.close();
++      }
++    }
++
++    checkMaxEligible();
++
++    if (eligibleApplications.isEmpty()) {
++      LOG.info("No eligible applications to process");
++      System.exit(0);
++    }
++
++    StringBuilder sb =
++        new StringBuilder("Will process the following applications:");
++    for (ApplicationReport report : eligibleApplications) {
++      sb.append("\n\t").append(report.getApplicationId());
++    }
++    LOG.info(sb.toString());
++
++    File localScript = File.createTempFile("hadoop-archive-logs-", ".sh");
++    generateScript(localScript, workingDir, remoteRootLogDir, suffix);
++
++    if (runDistributedShell(localScript)) {
++      return 0;
++    }
++    return -1;
++  }
++
++  private void handleOpts(String[] args) throws ParseException {
++    Options opts = new Options();
++    Option helpOpt = new Option(HELP_OPTION, false, "Prints this message");
++    Option maxEligibleOpt = new Option(MAX_ELIGIBLE_APPS_OPTION, true,
++        "The maximum number of eligible apps to process (default: "
++            + DEFAULT_MAX_ELIGIBLE + " (all))");
++    maxEligibleOpt.setArgName("n");
++    Option minNumLogFilesOpt = new Option(MIN_NUM_LOG_FILES_OPTION, true,
++        "The minimum number of log files required to be eligible (default: "
++            + DEFAULT_MIN_NUM_LOG_FILES + ")");
++    minNumLogFilesOpt.setArgName("n");
++    Option maxTotalLogsSizeOpt = new Option(MAX_TOTAL_LOGS_SIZE_OPTION, true,
++        "The maximum total logs size (in megabytes) required to be eligible" +
++            " (default: " + DEFAULT_MAX_TOTAL_LOGS_SIZE + ")");
++    maxTotalLogsSizeOpt.setArgName("megabytes");
++    Option memoryOpt = new Option(MEMORY_OPTION, true,
++        "The amount of memory (in megabytes) for each container (default: "
++            + DEFAULT_MEMORY + ")");
++    memoryOpt.setArgName("megabytes");
++    opts.addOption(helpOpt);
++    opts.addOption(maxEligibleOpt);
++    opts.addOption(minNumLogFilesOpt);
++    opts.addOption(maxTotalLogsSizeOpt);
++    opts.addOption(memoryOpt);
++
++    try {
++      CommandLineParser parser = new GnuParser();
++      CommandLine commandLine = parser.parse(opts, args);
++      if (commandLine.hasOption(HELP_OPTION)) {
++        HelpFormatter formatter = new HelpFormatter();
++        formatter.printHelp("yarn archive-logs", opts);
++        System.exit(0);
++      }
++      if (commandLine.hasOption(MAX_ELIGIBLE_APPS_OPTION)) {
++        maxEligible = Integer.parseInt(
++            commandLine.getOptionValue(MAX_ELIGIBLE_APPS_OPTION));
++        if (maxEligible == 0) {
++          LOG.info("Setting " + MAX_ELIGIBLE_APPS_OPTION + " to 0 accomplishes "
++              + "nothing. Please either set it to a negative value "
++              + "(default, all) or a more reasonable value.");
++          System.exit(0);
++        }
++      }
++      if (commandLine.hasOption(MIN_NUM_LOG_FILES_OPTION)) {
++        minNumLogFiles = Integer.parseInt(
++            commandLine.getOptionValue(MIN_NUM_LOG_FILES_OPTION));
++      }
++      if (commandLine.hasOption(MAX_TOTAL_LOGS_SIZE_OPTION)) {
++        maxTotalLogsSize = Long.parseLong(
++            commandLine.getOptionValue(MAX_TOTAL_LOGS_SIZE_OPTION));
++        maxTotalLogsSize *= 1024L * 1024L;
++      }
++      if (commandLine.hasOption(MEMORY_OPTION)) {
++        memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION));
++      }
++    } catch (ParseException pe) {
++      HelpFormatter formatter = new HelpFormatter();
++      formatter.printHelp("yarn archive-logs", opts);
++      throw pe;
++    }
++  }
++
++  @VisibleForTesting
++  void findAggregatedApps() throws IOException, YarnException {
++    YarnClient client = YarnClient.createYarnClient();
++    try {
++      client.init(getConf());
++      client.start();
++      List<ApplicationReport> reports = client.getApplications();
++      for (ApplicationReport report : reports) {
++        LogAggregationStatus aggStatus = report.getLogAggregationStatus();
++        if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) ||
++            aggStatus.equals(LogAggregationStatus.FAILED)) {
++          eligibleApplications.add(report);
++        }
++      }
++    } finally {
++      if (client != null) {
++        client.stop();
++      }
++    }
++  }
++
++  @VisibleForTesting
++  void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
++    for (Iterator<ApplicationReport> reportIt = eligibleApplications.iterator();
++         reportIt.hasNext(); ) {
++      ApplicationReport report = reportIt.next();
++      long totalFileSize = 0L;
++      try {
++        FileStatus[] files = fs.listStatus(
++            LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
++                report.getApplicationId(), report.getUser(), suffix));
++        if (files.length < minNumLogFiles) {
++          reportIt.remove();
++        } else {
++          for (FileStatus file : files) {
++            if (file.getPath().getName().equals(report.getApplicationId()
++                + ".har")) {
++              reportIt.remove();
++              break;
++            }
++            totalFileSize += file.getLen();
++          }
++          if (totalFileSize > maxTotalLogsSize) {
++            reportIt.remove();
++          }
++        }
++      } catch (IOException ioe) {
++        // If the user doesn't have permission or it doesn't exist, then skip it
++        reportIt.remove();
++      }
++    }
++  }
++
++  @VisibleForTesting
++  void checkMaxEligible() {
++    // If we have too many eligible apps, remove the newest ones first
++    if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
++      List<ApplicationReport> sortedApplications =
++          new ArrayList<ApplicationReport>(eligibleApplications);
++      Collections.sort(sortedApplications, new Comparator<ApplicationReport>() {
++        @Override
++        public int compare(ApplicationReport o1, ApplicationReport o2) {
++          return Long.compare(o1.getFinishTime(), o2.getFinishTime());
++        }
++      });
++      for (int i = maxEligible; i < sortedApplications.size(); i++) {
++        eligibleApplications.remove(sortedApplications.get(i));
++      }
++    }
++  }
++
++  /*
++  The generated script looks like this:
++  #!/bin/bash
++  set -e
++  set -x
++  if [ "$YARN_SHELL_ID" == "1" ]; then
++        appId="application_1440448768987_0001"
++        user="rkanter"
++  elif [ "$YARN_SHELL_ID" == "2" ]; then
++        appId="application_1440448768987_0002"
++        user="rkanter"
++  else
++        echo "Unknown Mapping!"
++        exit 1
++  fi
++  export HADOOP_CLIENT_OPTS="-Xmx1024m"
++  export HADOOP_CLASSPATH=/dist/share/hadoop/tools/lib/hadoop-archive-logs-2.8.0-SNAPSHOT.jar:/dist/share/hadoop/tools/lib/hadoop-archives-2.8.0-SNAPSHOT.jar
++  "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir /tmp/logs/archive-logs-work -remoteRootLogDir /tmp/logs -suffix logs
++   */
++  @VisibleForTesting
++  void generateScript(File localScript, Path workingDir,
++        Path remoteRootLogDir, String suffix) throws IOException {
++    LOG.info("Generating script at: " + localScript.getAbsolutePath());
++    String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain()
++        .getCodeSource().getLocation().getPath();
++    String harJarPath = HadoopArchives.class.getProtectionDomain()
++        .getCodeSource().getLocation().getPath();
++    String classpath = halrJarPath + File.pathSeparator + harJarPath;
++    FileWriter fw = null;
++    try {
++      fw = new FileWriter(localScript);
++      fw.write("#!/bin/bash\nset -e\nset -x\n");
++      int containerCount = 1;
++      for (ApplicationReport report : eligibleApplications) {
++        fw.write("if [ \"$YARN_SHELL_ID\" == \"");
++        fw.write(Integer.toString(containerCount));
++        fw.write("\" ]; then\n\tappId=\"");
++        fw.write(report.getApplicationId().toString());
++        fw.write("\"\n\tuser=\"");
++        fw.write(report.getUser());
++        fw.write("\"\nel");
++        containerCount++;
++      }
++      fw.write("se\n\techo \"Unknown Mapping!\"\n\texit 1\nfi\n");
++      fw.write("export HADOOP_CLIENT_OPTS=\"-Xmx");
++      fw.write(Long.toString(memory));
++      fw.write("m\"\n");
++      fw.write("export HADOOP_CLASSPATH=");
++      fw.write(classpath);
++      fw.write("\n\"$HADOOP_HOME\"/bin/hadoop ");
++      fw.write(HadoopArchiveLogsRunner.class.getName());
++      fw.write(" -appId \"$appId\" -user \"$user\" -workingDir ");
++      fw.write(workingDir.toString());
++      fw.write(" -remoteRootLogDir ");
++      fw.write(remoteRootLogDir.toString());
++      fw.write(" -suffix ");
++      fw.write(suffix);
++      fw.write("\n");
++    } finally {
++      if (fw != null) {
++        fw.close();
++      }
++    }
++  }
++
++  private boolean runDistributedShell(File localScript) throws Exception {
++    String[] dsArgs = {
++        "--appname",
++        "ArchiveLogs",
++        "--jar",
++        ApplicationMaster.class.getProtectionDomain().getCodeSource()
++            .getLocation().getPath(),
++        "--num_containers",
++        Integer.toString(eligibleApplications.size()),
++        "--container_memory",
++        Long.toString(memory),
++        "--shell_script",
++        localScript.getAbsolutePath()
++    };
++    final Client dsClient = new Client(new Configuration(conf));
++    dsClient.init(dsArgs);
++    return dsClient.run();
++  }
++
++  @Override
++  public void setConf(Configuration conf) {
++    if (conf instanceof JobConf) {
++      this.conf = (JobConf) conf;
++    } else {
++      this.conf = new JobConf(conf, HadoopArchiveLogs.class);
++    }
++  }
++
++  @Override
++  public Configuration getConf() {
++    return this.conf;
++  }
++}
+diff --git hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java
+new file mode 100644
+index 0000000..347e5fb
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java
+@@ -0,0 +1,180 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.tools;
++
++import org.apache.commons.cli.CommandLine;
++import org.apache.commons.cli.CommandLineParser;
++import org.apache.commons.cli.GnuParser;
++import org.apache.commons.cli.Option;
++import org.apache.commons.cli.Options;
++import org.apache.commons.cli.ParseException;
++import org.apache.commons.logging.Log;
++import org.apache.commons.logging.LogFactory;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.fs.PathFilter;
++import org.apache.hadoop.mapred.JobConf;
++import org.apache.hadoop.util.Tool;
++import org.apache.hadoop.util.ToolRunner;
++
++import java.io.File;
++
++/**
++ * This is a child program designed to be used by the {@link HadoopArchiveLogs}
++ * tool via the Distributed Shell.  It's not meant to be run directly.
++ */
++public class HadoopArchiveLogsRunner implements Tool {
++  private static final Log LOG = LogFactory.getLog(HadoopArchiveLogsRunner.class);
++
++  private static final String APP_ID_OPTION = "appId";
++  private static final String USER_OPTION = "user";
++  private static final String WORKING_DIR_OPTION = "workingDir";
++  private static final String REMOTE_ROOT_LOG_DIR = "remoteRootLogDir";
++  private static final String SUFFIX_OPTION = "suffix";
++
++  private String appId;
++  private String user;
++  private String workingDir;
++  private String remoteLogDir;
++  private String suffix;
++
++  private JobConf conf;
++
++  public HadoopArchiveLogsRunner(Configuration conf) {
++    setConf(conf);
++  }
++
++  public static void main(String[] args) {
++    JobConf job = new JobConf(HadoopArchiveLogsRunner.class);
++
++    HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(job);
++    int ret = 0;
++
++    try{
++      ret = ToolRunner.run(halr, args);
++    } catch(Exception e) {
++      LOG.debug("Exception", e);
++      System.err.println(e.getClass().getSimpleName());
++      final String s = e.getLocalizedMessage();
++      if (s != null) {
++        System.err.println(s);
++      } else {
++        e.printStackTrace(System.err);
++      }
++      System.exit(1);
++    }
++    System.exit(ret);
++  }
++
++  @Override
++  public int run(String[] args) throws Exception {
++    handleOpts(args);
++    String remoteAppLogDir = remoteLogDir + File.separator + user
++        + File.separator + suffix + File.separator + appId;
++
++    // Run 'hadoop archives' command in local mode
++    Configuration haConf = new Configuration(getConf());
++    haConf.set("mapreduce.framework.name", "local");
++    HadoopArchives ha = new HadoopArchives(haConf);
++    String[] haArgs = {
++        "-archiveName",
++        appId + ".har",
++        "-p",
++        remoteAppLogDir,
++        "*",
++        workingDir
++    };
++    StringBuilder sb = new StringBuilder("Executing 'hadoop archives'");
++    for (String haArg : haArgs) {
++      sb.append("\n\t").append(haArg);
++    }
++    LOG.info(sb.toString());
++    ha.run(haArgs);
++
++    FileSystem fs = null;
++    // Move har file to correct location and delete original logs
++    try {
++      fs = FileSystem.get(conf);
++      LOG.info("Moving har to original location");
++      fs.rename(new Path(workingDir, appId + ".har"),
++          new Path(remoteAppLogDir, appId + ".har"));
++      LOG.info("Deleting original logs");
++      for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir),
++          new PathFilter() {
++            @Override
++            public boolean accept(Path path) {
++              return !path.getName().endsWith(".har");
++            }
++          })) {
++        fs.delete(original.getPath(), false);
++      }
++    } finally {
++      if (fs != null) {
++        fs.close();
++      }
++    }
++
++    return 0;
++  }
++
++  private void handleOpts(String[] args) throws ParseException {
++    Options opts = new Options();
++    Option appIdOpt = new Option(APP_ID_OPTION, true, "Application ID");
++    appIdOpt.setRequired(true);
++    Option userOpt = new Option(USER_OPTION, true, "User");
++    userOpt.setRequired(true);
++    Option workingDirOpt = new Option(WORKING_DIR_OPTION, true,
++        "Working Directory");
++    workingDirOpt.setRequired(true);
++    Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR, true,
++        "Remote Root Log Directory");
++    remoteLogDirOpt.setRequired(true);
++    Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix");
++    suffixOpt.setRequired(true);
++    opts.addOption(appIdOpt);
++    opts.addOption(userOpt);
++    opts.addOption(workingDirOpt);
++    opts.addOption(remoteLogDirOpt);
++    opts.addOption(suffixOpt);
++
++    CommandLineParser parser = new GnuParser();
++    CommandLine commandLine = parser.parse(opts, args);
++    appId = commandLine.getOptionValue(APP_ID_OPTION);
++    user = commandLine.getOptionValue(USER_OPTION);
++    workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION);
++    remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR);
++    suffix = commandLine.getOptionValue(SUFFIX_OPTION);
++  }
++
++  @Override
++  public void setConf(Configuration conf) {
++    if (conf instanceof JobConf) {
++      this.conf = (JobConf) conf;
++    } else {
++      this.conf = new JobConf(conf, HadoopArchiveLogsRunner.class);
++    }
++  }
++
++  @Override
++  public Configuration getConf() {
++    return this.conf;
++  }
++}
+diff --git hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
+new file mode 100644
+index 0000000..c8ff201
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java
+@@ -0,0 +1,293 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.tools;
++
++import org.apache.commons.io.IOUtils;
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FSDataOutputStream;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
++import org.apache.hadoop.yarn.api.records.ApplicationId;
++import org.apache.hadoop.yarn.api.records.ApplicationReport;
++import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
++import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
++import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
++import org.apache.hadoop.yarn.api.records.Priority;
++import org.apache.hadoop.yarn.api.records.Resource;
++import org.apache.hadoop.yarn.api.records.YarnApplicationState;
++import org.apache.hadoop.yarn.conf.YarnConfiguration;
++import org.apache.hadoop.yarn.server.MiniYARNCluster;
++import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
++import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
++import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
++import org.junit.Assert;
++import org.junit.Test;
++
++import java.io.File;
++import java.io.IOException;
++import java.util.Random;
++
++public class TestHadoopArchiveLogs {
++
++  private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
++  private static final int FILE_SIZE_INCREMENT = 4096;
++  private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
++  static {
++    new Random().nextBytes(DUMMY_DATA);
++  }
++
++  @Test(timeout = 10000)
++  public void testCheckFiles() throws Exception {
++    Configuration conf = new Configuration();
++    HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
++    FileSystem fs = FileSystem.getLocal(conf);
++    Path rootLogDir = new Path("target", "logs");
++    String suffix = "logs";
++    Path logDir = new Path(rootLogDir,
++        new Path(System.getProperty("user.name"), suffix));
++    fs.mkdirs(logDir);
++
++    Assert.assertEquals(0, hal.eligibleApplications.size());
++    ApplicationReport app1 = createAppReport(1);  // no files found
++    ApplicationReport app2 = createAppReport(2);  // too few files
++    Path app2Path = new Path(logDir, app2.getApplicationId().toString());
++    fs.mkdirs(app2Path);
++    createFile(fs, new Path(app2Path, "file1"), 1);
++    hal.minNumLogFiles = 2;
++    ApplicationReport app3 = createAppReport(3);  // too large
++    Path app3Path = new Path(logDir, app3.getApplicationId().toString());
++    fs.mkdirs(app3Path);
++    createFile(fs, new Path(app3Path, "file1"), 2);
++    createFile(fs, new Path(app3Path, "file2"), 5);
++    hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
++    ApplicationReport app4 = createAppReport(4);  // has har already
++    Path app4Path = new Path(logDir, app4.getApplicationId().toString());
++    fs.mkdirs(app4Path);
++    createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1);
++    ApplicationReport app5 = createAppReport(5);  // just right
++    Path app5Path = new Path(logDir, app5.getApplicationId().toString());
++    fs.mkdirs(app5Path);
++    createFile(fs, new Path(app5Path, "file1"), 2);
++    createFile(fs, new Path(app5Path, "file2"), 3);
++    hal.eligibleApplications.add(app1);
++    hal.eligibleApplications.add(app2);
++    hal.eligibleApplications.add(app3);
++    hal.eligibleApplications.add(app4);
++    hal.eligibleApplications.add(app5);
++
++    hal.checkFiles(fs, rootLogDir, suffix);
++    Assert.assertEquals(1, hal.eligibleApplications.size());
++    Assert.assertEquals(app5, hal.eligibleApplications.iterator().next());
++  }
++
++  @Test(timeout = 10000)
++  public void testCheckMaxEligible() throws Exception {
++    Configuration conf = new Configuration();
++    HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
++    ApplicationReport app1 = createAppReport(1);
++    app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
++    ApplicationReport app2 = createAppReport(2);
++    app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
++    ApplicationReport app3 = createAppReport(3);
++    app3.setFinishTime(CLUSTER_TIMESTAMP + 5);
++    ApplicationReport app4 = createAppReport(4);
++    app4.setFinishTime(CLUSTER_TIMESTAMP + 10);
++    ApplicationReport app5 = createAppReport(5);
++    app5.setFinishTime(CLUSTER_TIMESTAMP);
++    Assert.assertEquals(0, hal.eligibleApplications.size());
++    hal.eligibleApplications.add(app1);
++    hal.eligibleApplications.add(app2);
++    hal.eligibleApplications.add(app3);
++    hal.eligibleApplications.add(app4);
++    hal.eligibleApplications.add(app5);
++    hal.maxEligible = -1;
++    hal.checkMaxEligible();
++    Assert.assertEquals(5, hal.eligibleApplications.size());
++
++    hal.maxEligible = 4;
++    hal.checkMaxEligible();
++    Assert.assertEquals(4, hal.eligibleApplications.size());
++    Assert.assertFalse(hal.eligibleApplications.contains(app4));
++
++    hal.maxEligible = 3;
++    hal.checkMaxEligible();
++    Assert.assertEquals(3, hal.eligibleApplications.size());
++    Assert.assertFalse(hal.eligibleApplications.contains(app3));
++
++    hal.maxEligible = 2;
++    hal.checkMaxEligible();
++    Assert.assertEquals(2, hal.eligibleApplications.size());
++    Assert.assertFalse(hal.eligibleApplications.contains(app5));
++
++    hal.maxEligible = 1;
++    hal.checkMaxEligible();
++    Assert.assertEquals(1, hal.eligibleApplications.size());
++    Assert.assertFalse(hal.eligibleApplications.contains(app1));
++  }
++
++  @Test(timeout = 10000)
++  public void testFindAggregatedApps() throws Exception {
++    MiniYARNCluster yarnCluster = null;
++    try {
++      Configuration conf = new Configuration();
++      conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
++      yarnCluster =
++          new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1,
++              1, 1, 1);
++      yarnCluster.init(conf);
++      yarnCluster.start();
++      conf = yarnCluster.getConfig();
++
++      RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
++      RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext,
++          LogAggregationStatus.DISABLED);
++      RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext,
++          LogAggregationStatus.FAILED);
++      RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext,
++          LogAggregationStatus.NOT_START);
++      RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext,
++          LogAggregationStatus.SUCCEEDED);
++      RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext,
++          LogAggregationStatus.RUNNING);
++      RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext,
++          LogAggregationStatus.RUNNING_WITH_FAILURE);
++      RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext,
++          LogAggregationStatus.TIME_OUT);
++      rmContext.getRMApps().put(app1.getApplicationId(), app1);
++      rmContext.getRMApps().put(app2.getApplicationId(), app2);
++      rmContext.getRMApps().put(app3.getApplicationId(), app3);
++      rmContext.getRMApps().put(app4.getApplicationId(), app4);
++      rmContext.getRMApps().put(app5.getApplicationId(), app5);
++      rmContext.getRMApps().put(app6.getApplicationId(), app6);
++      rmContext.getRMApps().put(app7.getApplicationId(), app7);
++
++      HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
++      Assert.assertEquals(0, hal.eligibleApplications.size());
++      hal.findAggregatedApps();
++      Assert.assertEquals(2, hal.eligibleApplications.size());
++    } finally {
++      if (yarnCluster != null) {
++        yarnCluster.stop();
++      }
++    }
++  }
++
++  @Test(timeout = 10000)
++  public void testGenerateScript() throws Exception {
++    Configuration conf = new Configuration();
++    HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
++    ApplicationReport app1 = createAppReport(1);
++    ApplicationReport app2 = createAppReport(2);
++    hal.eligibleApplications.add(app1);
++    hal.eligibleApplications.add(app2);
++
++    File localScript = new File("target", "script.sh");
++    Path workingDir = new Path("/tmp", "working");
++    Path remoteRootLogDir = new Path("/tmp", "logs");
++    String suffix = "logs";
++    localScript.delete();
++    Assert.assertFalse(localScript.exists());
++    hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix);
++    Assert.assertTrue(localScript.exists());
++    String script = IOUtils.toString(localScript.toURI());
++    String[] lines = script.split(System.lineSeparator());
++    Assert.assertEquals(16, lines.length);
++    Assert.assertEquals("#!/bin/bash", lines[0]);
++    Assert.assertEquals("set -e", lines[1]);
++    Assert.assertEquals("set -x", lines[2]);
++    Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
++    if (lines[4].contains(app1.getApplicationId().toString())) {
++      Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
++          + "\"", lines[4]);
++      Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
++          + "\"", lines[7]);
++    } else {
++      Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
++          + "\"", lines[4]);
++      Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
++          + "\"", lines[7]);
++    }
++    Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
++        lines[5]);
++    Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
++    Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
++        lines[8]);
++    Assert.assertEquals("else", lines[9]);
++    Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
++    Assert.assertEquals("\texit 1", lines[11]);
++    Assert.assertEquals("fi", lines[12]);
++    Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]);
++    Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH="));
++    Assert.assertEquals("\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." +
++        "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" -workingDir "
++        + workingDir.toString() + " -remoteRootLogDir " +
++        remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
++  }
++
++  private static ApplicationReport createAppReport(int id) {
++    ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
++    return ApplicationReport.newInstance(
++        appId,
++        ApplicationAttemptId.newInstance(appId, 1),
++        System.getProperty("user.name"),
++        null, null, null, 0, null, YarnApplicationState.FINISHED, null,
++        null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f,
++        null, null);
++  }
++
++  private static void createFile(FileSystem fs, Path p, long sizeMultiple)
++      throws IOException {
++    FSDataOutputStream out = null;
++    try {
++      out = fs.create(p);
++      for (int i = 0 ; i < sizeMultiple; i++) {
++        out.write(DUMMY_DATA);
++      }
++    } finally {
++      if (out != null) {
++        out.close();
++      }
++    }
++  }
++
++  private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
++       final LogAggregationStatus aggStatus) {
++    ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
++    ApplicationSubmissionContext submissionContext =
++        ApplicationSubmissionContext.newInstance(appId, "test", "default",
++            Priority.newInstance(0), null, false, true,
++            2, Resource.newInstance(10, 2), "test");
++    return new RMAppImpl(appId, rmContext, conf, "test",
++        System.getProperty("user.name"), "default", submissionContext,
++        rmContext.getScheduler(),
++        rmContext.getApplicationMasterService(),
++        System.currentTimeMillis(), "test",
++        null, null) {
++      @Override
++      public ApplicationReport createAndGetApplicationReport(
++          String clientUserName, boolean allowAccess) {
++        ApplicationReport report =
++            super.createAndGetApplicationReport(clientUserName, allowAccess);
++        report.setLogAggregationStatus(aggStatus);
++        return report;
++      }
++    };
++  }
++}
+diff --git hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
+new file mode 100644
+index 0000000..af66f14
+--- /dev/null
++++ hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java
+@@ -0,0 +1,143 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.hadoop.tools;
++
++import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FSDataOutputStream;
++import org.apache.hadoop.fs.FileStatus;
++import org.apache.hadoop.fs.FileSystem;
++import org.apache.hadoop.fs.HarFs;
++import org.apache.hadoop.fs.Path;
++import org.apache.hadoop.hdfs.MiniDFSCluster;
++import org.apache.hadoop.util.ToolRunner;
++import org.apache.hadoop.yarn.api.records.ApplicationId;
++import org.apache.hadoop.yarn.conf.YarnConfiguration;
++import org.apache.hadoop.yarn.server.MiniYARNCluster;
++import org.junit.Assert;
++import org.junit.Test;
++
++import java.io.IOException;
++import java.util.Arrays;
++import java.util.Comparator;
++import java.util.Random;
++
++import static org.junit.Assert.assertEquals;
++
++public class TestHadoopArchiveLogsRunner {
++
++  private static final int FILE_SIZE_INCREMENT = 4096;
++  private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
++  static {
++    new Random().nextBytes(DUMMY_DATA);
++  }
++
++  @Test(timeout = 30000)
++  public void testHadoopArchiveLogs() throws Exception {
++    MiniYARNCluster yarnCluster = null;
++    MiniDFSCluster dfsCluster = null;
++    FileSystem fs = null;
++    try {
++      Configuration conf = new YarnConfiguration();
++      conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
++      conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
++      yarnCluster =
++          new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
++              1, 2, 1, 1);
++      yarnCluster.init(conf);
++      yarnCluster.start();
++      conf = yarnCluster.getConfig();
++      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
++
++      ApplicationId app1 =
++          ApplicationId.newInstance(System.currentTimeMillis(), 1);
++      fs = FileSystem.get(conf);
++      Path remoteRootLogDir = new Path(conf.get(
++          YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
++          YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
++      Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
++      String suffix = "logs";
++      Path logDir = new Path(remoteRootLogDir,
++          new Path(System.getProperty("user.name"), suffix));
++      fs.mkdirs(logDir);
++      Path app1Path = new Path(logDir, app1.toString());
++      fs.mkdirs(app1Path);
++      createFile(fs, new Path(app1Path, "log1"), 3);
++      createFile(fs, new Path(app1Path, "log2"), 4);
++      createFile(fs, new Path(app1Path, "log3"), 2);
++      FileStatus[] app1Files = fs.listStatus(app1Path);
++      Assert.assertEquals(3, app1Files.length);
++
++      String[] args = new String[]{
++          "-appId", app1.toString(),
++          "-user", System.getProperty("user.name"),
++          "-workingDir", workingDir.toString(),
++          "-remoteRootLogDir", remoteRootLogDir.toString(),
++          "-suffix", suffix};
++      final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf);
++      assertEquals(0, ToolRunner.run(halr, args));
++
++      fs = FileSystem.get(conf);
++      app1Files = fs.listStatus(app1Path);
++      Assert.assertEquals(1, app1Files.length);
++      FileStatus harFile = app1Files[0];
++      Assert.assertEquals(app1.toString() + ".har", harFile.getPath().getName());
++      Path harPath = new Path("har:///" + harFile.getPath().toUri().getRawPath());
++      FileStatus[] harLogs = HarFs.get(harPath.toUri(), conf).listStatus(harPath);
++      Assert.assertEquals(3, harLogs.length);
++      Arrays.sort(harLogs, new Comparator<FileStatus>() {
++        @Override
++        public int compare(FileStatus o1, FileStatus o2) {
++          return o1.getPath().getName().compareTo(o2.getPath().getName());
++        }
++      });
++      Assert.assertEquals("log1", harLogs[0].getPath().getName());
++      Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen());
++      Assert.assertEquals("log2", harLogs[1].getPath().getName());
++      Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen());
++      Assert.assertEquals("log3", harLogs[2].getPath().getName());
++      Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen());
++      Assert.assertEquals(0, fs.listStatus(workingDir).length);
++    } finally {
++      if (yarnCluster != null) {
++        yarnCluster.stop();
++      }
++      if (fs != null) {
++        fs.close();
++      }
++      if (dfsCluster != null) {
++        dfsCluster.shutdown();
++      }
++    }
++  }
++
++  private static void createFile(FileSystem fs, Path p, long sizeMultiple)
++      throws IOException {
++    FSDataOutputStream out = null;
++    try {
++      out = fs.create(p);
++      for (int i = 0 ; i < sizeMultiple; i++) {
++        out.write(DUMMY_DATA);
++      }
++    } finally {
++      if (out != null) {
++        out.close();
++      }
++    }
++  }
++}
+diff --git hadoop-tools/hadoop-tools-dist/pom.xml hadoop-tools/hadoop-tools-dist/pom.xml
+index 540401d..e6c458f 100644
+--- hadoop-tools/hadoop-tools-dist/pom.xml
++++ hadoop-tools/hadoop-tools-dist/pom.xml
+@@ -52,6 +52,11 @@
+     </dependency>
+     <dependency>
+       <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-archive-logs</artifactId>
++      <scope>compile</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-rumen</artifactId>
+       <scope>compile</scope>
+     </dependency>
+diff --git hadoop-tools/pom.xml hadoop-tools/pom.xml
+index 5b35f46..0061bf0 100644
+--- hadoop-tools/pom.xml
++++ hadoop-tools/pom.xml
+@@ -34,6 +34,7 @@
+     <module>hadoop-streaming</module>
+     <module>hadoop-distcp</module>
+     <module>hadoop-archives</module>
++    <module>hadoop-archive-logs</module>
+     <module>hadoop-rumen</module>
+     <module>hadoop-gridmix</module>
+     <module>hadoop-datajoin</module>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
----------------------------------------------------------------------
diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
index fa55703..3f646e6 100644
--- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
+++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml
@@ -52,6 +52,13 @@
       </includes>
     </fileSet>
     <fileSet>
+      <directory>../hadoop-archive-logs/target</directory>
+      <outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
+      <includes>
+        <include>*-sources.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
       <directory>../hadoop-datajoin/target</directory>
       <outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
       <includes>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 5b5724b..428d37e 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -290,6 +290,9 @@ Release 2.8.0 - UNRELEASED
    MAPREDUCE-6304. Specifying node labels when submitting MR jobs.
    (Naganarasimha G R via wangda)
 
+   MAPREDUCE-6415. Create a tool to combine aggregated logs into HAR files. 
+   (Robert Kanter via kasha)
+
   IMPROVEMENTS
 
     MAPREDUCE-6291. Correct mapred queue usage command.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-mapreduce-project/bin/mapred
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred
index 426af80..2d56a8d 100755
--- a/hadoop-mapreduce-project/bin/mapred
+++ b/hadoop-mapreduce-project/bin/mapred
@@ -20,6 +20,7 @@ MYNAME="${BASH_SOURCE-$0}"
 function hadoop_usage
 {
   hadoop_add_subcommand "archive" "create a hadoop archive"
+  hadoop_add_subcommand "archive-logs" "combine aggregated logs into hadoop archives"
   hadoop_add_subcommand "classpath" "prints the class path needed for running mapreduce subcommands"
   hadoop_add_subcommand "distcp" "copy file or directories recursively"
   hadoop_add_subcommand "historyserver" "run job history servers as a standalone daemon"
@@ -72,6 +73,13 @@ case ${COMMAND} in
     hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
     HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
   ;;
+  archive-logs)
+    CLASS=org.apache.hadoop.tools.HadoopArchiveLogs
+    hadoop_debug "Injecting TOOL_PATH into CLASSPATH"
+    hadoop_add_classpath "${TOOL_PATH}"
+    hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
+    HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
+  ;;
   classpath)
     hadoop_do_classpath_subcommand CLASS "$@"
   ;;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 9863475..636e063 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -324,6 +324,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-archive-logs</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-distcp</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-tools/hadoop-archive-logs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/pom.xml b/hadoop-tools/hadoop-archive-logs/pom.xml
new file mode 100644
index 0000000..2a480a8
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/pom.xml
@@ -0,0 +1,171 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+                      http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-project</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+    <relativePath>../../hadoop-project</relativePath>
+  </parent>
+  <groupId>org.apache.hadoop</groupId>
+  <artifactId>hadoop-archive-logs</artifactId>
+  <version>3.0.0-SNAPSHOT</version>
+  <description>Apache Hadoop Archive Logs</description>
+  <name>Apache Hadoop Archive Logs</name>
+  <packaging>jar</packaging>
+
+  <properties>
+    <hadoop.log.dir>${project.build.directory}/log</hadoop.log.dir>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-applications-distributedshell</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-archives</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-cli</groupId>
+      <artifactId>commons-cli</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>create-log-dir</id>
+            <phase>process-test-resources</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <target>
+                <delete dir="${test.build.data}"/>
+                <mkdir dir="${test.build.data}"/>
+                <mkdir dir="${hadoop.log.dir}"/>
+              </target>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+         <configuration>
+          <archive>
+           <manifest>
+            <mainClass>org.apache.hadoop.tools.HadoopArchiveLogs</mainClass>
+           </manifest>
+         </archive>
+        </configuration>
+       </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
new file mode 100644
index 0000000..4778dcb
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
+import org.apache.hadoop.yarn.applications.distributedshell.Client;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+
+/**
+ * This tool moves Aggregated Log files into HAR archives using the
+ * {@link HadoopArchives} tool and the Distributed Shell via the
+ * {@link HadoopArchiveLogsRunner}.
+ */
+public class HadoopArchiveLogs implements Tool {
+  private static final Log LOG = LogFactory.getLog(HadoopArchiveLogs.class);
+
+  private static final String HELP_OPTION = "help";
+  private static final String MAX_ELIGIBLE_APPS_OPTION = "maxEligibleApps";
+  private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles";
+  private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize";
+  private static final String MEMORY_OPTION = "memory";
+
+  private static final int DEFAULT_MAX_ELIGIBLE = -1;
+  private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
+  private static final long DEFAULT_MAX_TOTAL_LOGS_SIZE = 1024L;
+  private static final long DEFAULT_MEMORY = 1024L;
+
+  @VisibleForTesting
+  int maxEligible = DEFAULT_MAX_ELIGIBLE;
+  @VisibleForTesting
+  int minNumLogFiles = DEFAULT_MIN_NUM_LOG_FILES;
+  @VisibleForTesting
+  long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L;
+  @VisibleForTesting
+  long memory = DEFAULT_MEMORY;
+
+  @VisibleForTesting
+  Set<ApplicationReport> eligibleApplications;
+
+  private JobConf conf;
+
+  public HadoopArchiveLogs(Configuration conf) {
+    setConf(conf);
+    eligibleApplications = new HashSet<>();
+  }
+
+  public static void main(String[] args) {
+    JobConf job = new JobConf(HadoopArchiveLogs.class);
+
+    HadoopArchiveLogs hal = new HadoopArchiveLogs(job);
+    int ret = 0;
+
+    try{
+      ret = ToolRunner.run(hal, args);
+    } catch(Exception e) {
+      LOG.debug("Exception", e);
+      System.err.println(e.getClass().getSimpleName());
+      final String s = e.getLocalizedMessage();
+      if (s != null) {
+        System.err.println(s);
+      } else {
+        e.printStackTrace(System.err);
+      }
+      System.exit(1);
+    }
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    handleOpts(args);
+
+    findAggregatedApps();
+
+    FileSystem fs = null;
+    Path remoteRootLogDir = new Path(conf.get(
+        YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
+    Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
+    try {
+      fs = FileSystem.get(conf);
+      checkFiles(fs, remoteRootLogDir, suffix);
+
+      // Prepare working directory
+      if (fs.exists(workingDir)) {
+        fs.delete(workingDir, true);
+      }
+      fs.mkdirs(workingDir);
+      fs.setPermission(workingDir,
+          new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+    }
+
+    checkMaxEligible();
+
+    if (eligibleApplications.isEmpty()) {
+      LOG.info("No eligible applications to process");
+      System.exit(0);
+    }
+
+    StringBuilder sb =
+        new StringBuilder("Will process the following applications:");
+    for (ApplicationReport report : eligibleApplications) {
+      sb.append("\n\t").append(report.getApplicationId());
+    }
+    LOG.info(sb.toString());
+
+    File localScript = File.createTempFile("hadoop-archive-logs-", ".sh");
+    generateScript(localScript, workingDir, remoteRootLogDir, suffix);
+
+    if (runDistributedShell(localScript)) {
+      return 0;
+    }
+    return -1;
+  }
+
+  private void handleOpts(String[] args) throws ParseException {
+    Options opts = new Options();
+    Option helpOpt = new Option(HELP_OPTION, false, "Prints this message");
+    Option maxEligibleOpt = new Option(MAX_ELIGIBLE_APPS_OPTION, true,
+        "The maximum number of eligible apps to process (default: "
+            + DEFAULT_MAX_ELIGIBLE + " (all))");
+    maxEligibleOpt.setArgName("n");
+    Option minNumLogFilesOpt = new Option(MIN_NUM_LOG_FILES_OPTION, true,
+        "The minimum number of log files required to be eligible (default: "
+            + DEFAULT_MIN_NUM_LOG_FILES + ")");
+    minNumLogFilesOpt.setArgName("n");
+    Option maxTotalLogsSizeOpt = new Option(MAX_TOTAL_LOGS_SIZE_OPTION, true,
+        "The maximum total logs size (in megabytes) required to be eligible" +
+            " (default: " + DEFAULT_MAX_TOTAL_LOGS_SIZE + ")");
+    maxTotalLogsSizeOpt.setArgName("megabytes");
+    Option memoryOpt = new Option(MEMORY_OPTION, true,
+        "The amount of memory (in megabytes) for each container (default: "
+            + DEFAULT_MEMORY + ")");
+    memoryOpt.setArgName("megabytes");
+    opts.addOption(helpOpt);
+    opts.addOption(maxEligibleOpt);
+    opts.addOption(minNumLogFilesOpt);
+    opts.addOption(maxTotalLogsSizeOpt);
+    opts.addOption(memoryOpt);
+
+    try {
+      CommandLineParser parser = new GnuParser();
+      CommandLine commandLine = parser.parse(opts, args);
+      if (commandLine.hasOption(HELP_OPTION)) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("yarn archive-logs", opts);
+        System.exit(0);
+      }
+      if (commandLine.hasOption(MAX_ELIGIBLE_APPS_OPTION)) {
+        maxEligible = Integer.parseInt(
+            commandLine.getOptionValue(MAX_ELIGIBLE_APPS_OPTION));
+        if (maxEligible == 0) {
+          LOG.info("Setting " + MAX_ELIGIBLE_APPS_OPTION + " to 0 accomplishes "
+              + "nothing. Please either set it to a negative value "
+              + "(default, all) or a more reasonable value.");
+          System.exit(0);
+        }
+      }
+      if (commandLine.hasOption(MIN_NUM_LOG_FILES_OPTION)) {
+        minNumLogFiles = Integer.parseInt(
+            commandLine.getOptionValue(MIN_NUM_LOG_FILES_OPTION));
+      }
+      if (commandLine.hasOption(MAX_TOTAL_LOGS_SIZE_OPTION)) {
+        maxTotalLogsSize = Long.parseLong(
+            commandLine.getOptionValue(MAX_TOTAL_LOGS_SIZE_OPTION));
+        maxTotalLogsSize *= 1024L * 1024L;
+      }
+      if (commandLine.hasOption(MEMORY_OPTION)) {
+        memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION));
+      }
+    } catch (ParseException pe) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("yarn archive-logs", opts);
+      throw pe;
+    }
+  }
+
+  @VisibleForTesting
+  void findAggregatedApps() throws IOException, YarnException {
+    YarnClient client = YarnClient.createYarnClient();
+    try {
+      client.init(getConf());
+      client.start();
+      List<ApplicationReport> reports = client.getApplications();
+      for (ApplicationReport report : reports) {
+        LogAggregationStatus aggStatus = report.getLogAggregationStatus();
+        if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) ||
+            aggStatus.equals(LogAggregationStatus.FAILED)) {
+          eligibleApplications.add(report);
+        }
+      }
+    } finally {
+      if (client != null) {
+        client.stop();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
+    for (Iterator<ApplicationReport> reportIt = eligibleApplications.iterator();
+         reportIt.hasNext(); ) {
+      ApplicationReport report = reportIt.next();
+      long totalFileSize = 0L;
+      try {
+        FileStatus[] files = fs.listStatus(
+            LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
+                report.getApplicationId(), report.getUser(), suffix));
+        if (files.length < minNumLogFiles) {
+          reportIt.remove();
+        } else {
+          for (FileStatus file : files) {
+            if (file.getPath().getName().equals(report.getApplicationId()
+                + ".har")) {
+              reportIt.remove();
+              break;
+            }
+            totalFileSize += file.getLen();
+          }
+          if (totalFileSize > maxTotalLogsSize) {
+            reportIt.remove();
+          }
+        }
+      } catch (IOException ioe) {
+        // If the user doesn't have permission or it doesn't exist, then skip it
+        reportIt.remove();
+      }
+    }
+  }
+
+  @VisibleForTesting
+  void checkMaxEligible() {
+    // If we have too many eligible apps, remove the newest ones first
+    if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
+      List<ApplicationReport> sortedApplications =
+          new ArrayList<ApplicationReport>(eligibleApplications);
+      Collections.sort(sortedApplications, new Comparator<ApplicationReport>() {
+        @Override
+        public int compare(ApplicationReport o1, ApplicationReport o2) {
+          return Long.compare(o1.getFinishTime(), o2.getFinishTime());
+        }
+      });
+      for (int i = maxEligible; i < sortedApplications.size(); i++) {
+        eligibleApplications.remove(sortedApplications.get(i));
+      }
+    }
+  }
+
+  /*
+  The generated script looks like this:
+  #!/bin/bash
+  set -e
+  set -x
+  if [ "$YARN_SHELL_ID" == "1" ]; then
+        appId="application_1440448768987_0001"
+        user="rkanter"
+  elif [ "$YARN_SHELL_ID" == "2" ]; then
+        appId="application_1440448768987_0002"
+        user="rkanter"
+  else
+        echo "Unknown Mapping!"
+        exit 1
+  fi
+  export HADOOP_CLIENT_OPTS="-Xmx1024m"
+  export HADOOP_CLASSPATH=/dist/share/hadoop/tools/lib/hadoop-archive-logs-2.8.0-SNAPSHOT.jar:/dist/share/hadoop/tools/lib/hadoop-archives-2.8.0-SNAPSHOT.jar
+  "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir /tmp/logs/archive-logs-work -remoteRootLogDir /tmp/logs -suffix logs
+   */
+  @VisibleForTesting
+  void generateScript(File localScript, Path workingDir,
+        Path remoteRootLogDir, String suffix) throws IOException {
+    LOG.info("Generating script at: " + localScript.getAbsolutePath());
+    String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain()
+        .getCodeSource().getLocation().getPath();
+    String harJarPath = HadoopArchives.class.getProtectionDomain()
+        .getCodeSource().getLocation().getPath();
+    String classpath = halrJarPath + File.pathSeparator + harJarPath;
+    FileWriter fw = null;
+    try {
+      fw = new FileWriter(localScript);
+      fw.write("#!/bin/bash\nset -e\nset -x\n");
+      int containerCount = 1;
+      for (ApplicationReport report : eligibleApplications) {
+        fw.write("if [ \"$YARN_SHELL_ID\" == \"");
+        fw.write(Integer.toString(containerCount));
+        fw.write("\" ]; then\n\tappId=\"");
+        fw.write(report.getApplicationId().toString());
+        fw.write("\"\n\tuser=\"");
+        fw.write(report.getUser());
+        fw.write("\"\nel");
+        containerCount++;
+      }
+      fw.write("se\n\techo \"Unknown Mapping!\"\n\texit 1\nfi\n");
+      fw.write("export HADOOP_CLIENT_OPTS=\"-Xmx");
+      fw.write(Long.toString(memory));
+      fw.write("m\"\n");
+      fw.write("export HADOOP_CLASSPATH=");
+      fw.write(classpath);
+      fw.write("\n\"$HADOOP_HOME\"/bin/hadoop ");
+      fw.write(HadoopArchiveLogsRunner.class.getName());
+      fw.write(" -appId \"$appId\" -user \"$user\" -workingDir ");
+      fw.write(workingDir.toString());
+      fw.write(" -remoteRootLogDir ");
+      fw.write(remoteRootLogDir.toString());
+      fw.write(" -suffix ");
+      fw.write(suffix);
+      fw.write("\n");
+    } finally {
+      if (fw != null) {
+        fw.close();
+      }
+    }
+  }
+
+  private boolean runDistributedShell(File localScript) throws Exception {
+    String[] dsArgs = {
+        "--appname",
+        "ArchiveLogs",
+        "--jar",
+        ApplicationMaster.class.getProtectionDomain().getCodeSource()
+            .getLocation().getPath(),
+        "--num_containers",
+        Integer.toString(eligibleApplications.size()),
+        "--container_memory",
+        Long.toString(memory),
+        "--shell_script",
+        localScript.getAbsolutePath()
+    };
+    final Client dsClient = new Client(new Configuration(conf));
+    dsClient.init(dsArgs);
+    return dsClient.run();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf, HadoopArchiveLogs.class);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119cc75e/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java
new file mode 100644
index 0000000..347e5fb
--- /dev/null
+++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.File;
+
+/**
+ * This is a child program designed to be used by the {@link HadoopArchiveLogs}
+ * tool via the Distributed Shell.  It's not meant to be run directly.
+ */
+public class HadoopArchiveLogsRunner implements Tool {
+  private static final Log LOG = LogFactory.getLog(HadoopArchiveLogsRunner.class);
+
+  private static final String APP_ID_OPTION = "appId";
+  private static final String USER_OPTION = "user";
+  private static final String WORKING_DIR_OPTION = "workingDir";
+  private static final String REMOTE_ROOT_LOG_DIR = "remoteRootLogDir";
+  private static final String SUFFIX_OPTION = "suffix";
+
+  private String appId;
+  private String user;
+  private String workingDir;
+  private String remoteLogDir;
+  private String suffix;
+
+  private JobConf conf;
+
+  public HadoopArchiveLogsRunner(Configuration conf) {
+    setConf(conf);
+  }
+
+  public static void main(String[] args) {
+    JobConf job = new JobConf(HadoopArchiveLogsRunner.class);
+
+    HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(job);
+    int ret = 0;
+
+    try{
+      ret = ToolRunner.run(halr, args);
+    } catch(Exception e) {
+      LOG.debug("Exception", e);
+      System.err.println(e.getClass().getSimpleName());
+      final String s = e.getLocalizedMessage();
+      if (s != null) {
+        System.err.println(s);
+      } else {
+        e.printStackTrace(System.err);
+      }
+      System.exit(1);
+    }
+    System.exit(ret);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    handleOpts(args);
+    String remoteAppLogDir = remoteLogDir + File.separator + user
+        + File.separator + suffix + File.separator + appId;
+
+    // Run 'hadoop archives' command in local mode
+    Configuration haConf = new Configuration(getConf());
+    haConf.set("mapreduce.framework.name", "local");
+    HadoopArchives ha = new HadoopArchives(haConf);
+    String[] haArgs = {
+        "-archiveName",
+        appId + ".har",
+        "-p",
+        remoteAppLogDir,
+        "*",
+        workingDir
+    };
+    StringBuilder sb = new StringBuilder("Executing 'hadoop archives'");
+    for (String haArg : haArgs) {
+      sb.append("\n\t").append(haArg);
+    }
+    LOG.info(sb.toString());
+    ha.run(haArgs);
+
+    FileSystem fs = null;
+    // Move har file to correct location and delete original logs
+    try {
+      fs = FileSystem.get(conf);
+      LOG.info("Moving har to original location");
+      fs.rename(new Path(workingDir, appId + ".har"),
+          new Path(remoteAppLogDir, appId + ".har"));
+      LOG.info("Deleting original logs");
+      for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir),
+          new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+              return !path.getName().endsWith(".har");
+            }
+          })) {
+        fs.delete(original.getPath(), false);
+      }
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+    }
+
+    return 0;
+  }
+
+  private void handleOpts(String[] args) throws ParseException {
+    Options opts = new Options();
+    Option appIdOpt = new Option(APP_ID_OPTION, true, "Application ID");
+    appIdOpt.setRequired(true);
+    Option userOpt = new Option(USER_OPTION, true, "User");
+    userOpt.setRequired(true);
+    Option workingDirOpt = new Option(WORKING_DIR_OPTION, true,
+        "Working Directory");
+    workingDirOpt.setRequired(true);
+    Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR, true,
+        "Remote Root Log Directory");
+    remoteLogDirOpt.setRequired(true);
+    Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix");
+    suffixOpt.setRequired(true);
+    opts.addOption(appIdOpt);
+    opts.addOption(userOpt);
+    opts.addOption(workingDirOpt);
+    opts.addOption(remoteLogDirOpt);
+    opts.addOption(suffixOpt);
+
+    CommandLineParser parser = new GnuParser();
+    CommandLine commandLine = parser.parse(opts, args);
+    appId = commandLine.getOptionValue(APP_ID_OPTION);
+    user = commandLine.getOptionValue(USER_OPTION);
+    workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION);
+    remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR);
+    suffix = commandLine.getOptionValue(SUFFIX_OPTION);
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf, HadoopArchiveLogsRunner.class);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+}


Mime
View raw message