hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From junping...@apache.org
Subject [2/2] hadoop git commit: YARN-6877. Create an abstract log reader for extendability. Contributed by Xuan Gong.
Date Fri, 01 Sep 2017 10:02:52 GMT
YARN-6877. Create an abstract log reader for extendability. Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/119220b8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/119220b8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/119220b8

Branch: refs/heads/branch-2
Commit: 119220b88f196f27f9bf7beeef7875da506cb103
Parents: 2442a8d
Author: Junping Du <junping_du@apache.org>
Authored: Fri Sep 1 03:04:55 2017 -0700
Committer: Junping Du <junping_du@apache.org>
Committed: Fri Sep 1 03:04:55 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/client/cli/TestLogsCLI.java     |  11 +-
 .../logaggregation/LogAggregationWebUtils.java  | 124 +++++
 .../yarn/logaggregation/LogCLIHelpers.java      | 449 +++----------------
 .../yarn/logaggregation/LogToolUtils.java       | 167 -------
 .../LogAggregationFileController.java           |  79 ++++
 .../filecontroller/LogAggregationHtmlBlock.java | 186 ++++++++
 .../LogAggregationTFileController.java          | 127 ------
 .../tfile/LogAggregationTFileController.java    | 375 ++++++++++++++++
 .../tfile/TFileAggregatedLogsBlock.java         | 241 ++++++++++
 .../filecontroller/tfile/package-info.java      |  20 +
 .../yarn/webapp/log/AggregatedLogsBlock.java    | 313 ++-----------
 .../src/main/resources/yarn-default.xml         |   2 +-
 .../logaggregation/TestAggregatedLogsBlock.java |  82 +++-
 ...TestLogAggregationFileControllerFactory.java |  39 +-
 .../webapp/AHSWebServices.java                  |  31 +-
 .../logaggregation/AppLogAggregatorImpl.java    |   2 +-
 .../nodemanager/webapp/NMWebServices.java       |  36 +-
 .../TestAppLogAggregatorImpl.java               |   2 +-
 .../TestLogAggregationService.java              |   2 +-
 19 files changed, 1299 insertions(+), 989 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
index 62f78e7..1085c14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
@@ -52,6 +52,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -467,7 +468,7 @@ public class TestLogsCLI {
     assertTrue(exitCode == 0);
     assertTrue(sysOutStream.toString().contains(
         logMessage(containerId1, "syslog")));
-    assertTrue(sysOutStream.toString().contains("Log Upload Time"));
+    assertTrue(sysOutStream.toString().contains("LogLastModifiedTime"));
     assertTrue(!sysOutStream.toString().contains(
       "Logs for container " + containerId1.toString()
           + " are not present in this log-file."));
@@ -491,8 +492,12 @@ public class TestLogsCLI {
 
     String logMessage = logMessage(containerId3, "stdout");
     int fileContentSize = logMessage.getBytes().length;
-    int tailContentSize = "\nEnd of LogType:stdout\n\n".getBytes().length;
-
+    StringBuilder sb = new StringBuilder();
+    String endOfFile = "End of LogType:stdout";
+    sb.append("\n" + endOfFile + "\n");
+    sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+        + "\n\n");
+    int tailContentSize = sb.toString().length();
     // specify how many bytes we should get from logs
     // specify a position number, it would get the first n bytes from
     // container log

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java
new file mode 100644
index 0000000..c5df240
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationWebUtils.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.logaggregation;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
+
+/**
+ * Utils for rendering aggregated logs block.
+ *
+ */
+@Private
+public final class LogAggregationWebUtils {
+
+  private LogAggregationWebUtils() {}
+
+  /**
+   * Parse start index from html.
+   * @param html the html
+   * @param startStr the start index string
+   * @return the startIndex
+   */
+  public static long getLogStartIndex(Block html, String startStr)
+      throws NumberFormatException {
+    long start = -4096;
+
+    if (startStr != null && !startStr.isEmpty()) {
+      start = Long.parseLong(startStr);
+    }
+    return start;
+  }
+
+  /**
+   * Parse end index from html.
+   * @param html the html
+   * @param endStr the end index string
+   * @return the endIndex
+   */
+  public static long getLogEndIndex(Block html, String endStr)
+      throws NumberFormatException {
+    long end = Long.MAX_VALUE;
+
+    if (endStr != null && !endStr.isEmpty()) {
+      end = Long.parseLong(endStr);
+    }
+    return end;
+  }
+
+  /**
+   * Verify and parse containerId.
+   * @param html the html
+   * @param containerIdStr the containerId string
+   * @return the {@link ContainerId}
+   */
+  public static ContainerId verifyAndGetContainerId(Block html,
+      String containerIdStr) {
+    if (containerIdStr == null || containerIdStr.isEmpty()) {
+      html.h1()._("Cannot get container logs without a ContainerId")._();
+      return null;
+    }
+    ContainerId containerId = null;
+    try {
+      containerId = ContainerId.fromString(containerIdStr);
+    } catch (IllegalArgumentException e) {
+      html.h1()
+          ._("Cannot get container logs for invalid containerId: "
+              + containerIdStr)._();
+      return null;
+    }
+    return containerId;
+  }
+
+  /**
+   * Verify and parse NodeId.
+   * @param html the html
+   * @param nodeIdStr the nodeId string
+   * @return the {@link NodeId}
+   */
+  public static NodeId verifyAndGetNodeId(Block html, String nodeIdStr) {
+    if (nodeIdStr == null || nodeIdStr.isEmpty()) {
+      html.h1()._("Cannot get container logs without a NodeId")._();
+      return null;
+    }
+    NodeId nodeId = null;
+    try {
+      nodeId = NodeId.fromString(nodeIdStr);
+    } catch (IllegalArgumentException e) {
+      html.h1()._("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
+          ._();
+      return null;
+    }
+    return nodeId;
+  }
+
+  /**
+   * Verify and parse the application owner.
+   * @param html the html
+   * @param appOwner the Application owner
+   * @return the appOwner
+   */
+  public static String verifyAndGetAppOwner(Block html, String appOwner) {
+    if (appOwner == null || appOwner.isEmpty()) {
+      html.h1()._("Cannot get container logs without an app owner")._();
+    }
+    return appOwner;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
index 1a9c601..03acb33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.logaggregation;
 
-import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintStream;
@@ -37,15 +35,14 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.HarFs;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import com.google.common.annotations.VisibleForTesting;
 
 public class LogCLIHelpers implements Configurable {
@@ -56,6 +53,7 @@ public class LogCLIHelpers implements Configurable {
       "Container: %s on %s";
 
   private Configuration conf;
+  private LogAggregationFileControllerFactory factory;
 
   @Private
   @VisibleForTesting
@@ -130,71 +128,11 @@ public class LogCLIHelpers implements Configurable {
   @VisibleForTesting
   public int dumpAContainerLogsForLogType(ContainerLogsRequest options,
       boolean outputFailure) throws IOException {
-    ApplicationId applicationId = options.getAppId();
-    String jobOwner = options.getAppOwner();
-    String nodeId = options.getNodeId();
-    String containerId = options.getContainerId();
-    String localDir = options.getOutputLocalDir();
-    List<String> logType = new ArrayList<String>(options.getLogTypes());
-    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
-        applicationId, jobOwner);
-    if (nodeFiles == null) {
-      return -1;
-    }
-    boolean foundContainerLogs = false;
-    while (nodeFiles.hasNext()) {
-      FileStatus thisNodeFile = nodeFiles.next();
-      String fileName = thisNodeFile.getPath().getName();
-      if (fileName.equals(applicationId + ".har")) {
-        Path p = new Path("har:///"
-            + thisNodeFile.getPath().toUri().getRawPath());
-        nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
-        continue;
-      }
-      if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
-          && !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
-        AggregatedLogFormat.LogReader reader = null;
-        PrintStream out = createPrintStream(localDir, fileName, containerId);
-        try {
-          reader = new AggregatedLogFormat.LogReader(getConf(),
-              thisNodeFile.getPath());
-          if (getContainerLogsStream(containerId, reader) == null) {
-            continue;
-          }
-          String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
-              containerId, thisNodeFile.getPath().getName());
-          out.println(containerString);
-          out.println("LogAggregationType: AGGREGATED");
-          out.println(StringUtils.repeat("=", containerString.length()));
-          // We have to re-create reader object to reset the stream index
-          // after calling getContainerLogsStream which would move the stream
-          // index to the end of the log file.
-          reader =
-              new AggregatedLogFormat.LogReader(getConf(),
-                thisNodeFile.getPath());
-          if (logType == null || logType.isEmpty()) {
-            if (dumpAContainerLogs(containerId, reader, out,
-                thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
-              foundContainerLogs = true;
-            }
-          } else {
-            if (dumpAContainerLogsForALogType(containerId, reader, out,
-                thisNodeFile.getModificationTime(), logType,
-                options.getBytes()) > -1) {
-              foundContainerLogs = true;
-            }
-          }
-        } finally {
-          if (reader != null) {
-            reader.close();
-          }
-          closePrintStream(out);
-        }
-      }
-    }
-    if (!foundContainerLogs) {
+    boolean foundAnyLogs = this.getFileController(options.getAppId(),
+        options.getAppOwner()).readAggregatedLogs(options, null);
+    if (!foundAnyLogs) {
       if (outputFailure) {
-        containerLogNotFound(containerId);
+        containerLogNotFound(options.getContainerId());
       }
       return -1;
     }
@@ -204,217 +142,25 @@ public class LogCLIHelpers implements Configurable {
   @Private
   public int dumpAContainerLogsForLogTypeWithoutNodeId(
       ContainerLogsRequest options) throws IOException {
-    ApplicationId applicationId = options.getAppId();
-    String jobOwner = options.getAppOwner();
-    String containerId = options.getContainerId();
-    String localDir = options.getOutputLocalDir();
-    List<String> logType = new ArrayList<String>(options.getLogTypes());
-    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
-        applicationId, jobOwner);
-    if (nodeFiles == null) {
-      return -1;
-    }
-    boolean foundContainerLogs = false;
-    while(nodeFiles.hasNext()) {
-      FileStatus thisNodeFile = nodeFiles.next();
-      if (!thisNodeFile.getPath().getName().endsWith(
-          LogAggregationUtils.TMP_FILE_SUFFIX)) {
-        AggregatedLogFormat.LogReader reader = null;
-        PrintStream out = System.out;
-        try {
-          reader =
-              new AggregatedLogFormat.LogReader(getConf(),
-              thisNodeFile.getPath());
-          if (getContainerLogsStream(containerId, reader) == null) {
-            continue;
-          }
-          // We have to re-create reader object to reset the stream index
-          // after calling getContainerLogsStream which would move the stream
-          // index to the end of the log file.
-          reader =
-              new AggregatedLogFormat.LogReader(getConf(),
-              thisNodeFile.getPath());
-          out = createPrintStream(localDir, thisNodeFile.getPath().getName(),
-              containerId);
-          String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
-              containerId, thisNodeFile.getPath().getName());
-          out.println(containerString);
-          out.println("LogAggregationType: AGGREGATED");
-          out.println(StringUtils.repeat("=", containerString.length()));
-          if (logType == null || logType.isEmpty()) {
-            if (dumpAContainerLogs(containerId, reader, out,
-                thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
-              foundContainerLogs = true;
-            }
-          } else {
-            if (dumpAContainerLogsForALogType(containerId, reader, out,
-                thisNodeFile.getModificationTime(), logType,
-                options.getBytes()) > -1) {
-              foundContainerLogs = true;
-            }
-          }
-        } finally {
-          if (reader != null) {
-            reader.close();
-          }
-          closePrintStream(out);
-        }
-      }
-    }
-    if (!foundContainerLogs) {
-      containerLogNotFound(containerId);
+    boolean foundAnyLogs = getFileController(options.getAppId(),
+        options.getAppOwner()).readAggregatedLogs(
+        options, null);
+    if (!foundAnyLogs) {
+      containerLogNotFound(options.getContainerId());
       return -1;
     }
     return 0;
   }
 
   @Private
-  public int dumpAContainerLogs(String containerIdStr,
-      AggregatedLogFormat.LogReader reader, PrintStream out,
-      long logUploadedTime, long bytes) throws IOException {
-    DataInputStream valueStream = getContainerLogsStream(
-        containerIdStr, reader);
-
-    if (valueStream == null) {
-      return -1;
-    }
-
-    boolean foundContainerLogs = false;
-    while (true) {
-      try {
-        LogReader.readAContainerLogsForALogType(valueStream, out,
-            logUploadedTime, bytes);
-        foundContainerLogs = true;
-      } catch (EOFException eof) {
-        break;
-      }
-    }
-    if (foundContainerLogs) {
-      return 0;
-    }
-    return -1;
-  }
-
-  private DataInputStream getContainerLogsStream(String containerIdStr,
-      AggregatedLogFormat.LogReader reader) throws IOException {
-    DataInputStream valueStream;
-    LogKey key = new LogKey();
-    valueStream = reader.next(key);
-
-    while (valueStream != null && !key.toString().equals(containerIdStr)) {
-      // Next container
-      key = new LogKey();
-      valueStream = reader.next(key);
-    }
-    return valueStream;
-  }
-
-  @Private
-  public int dumpAContainerLogsForALogType(String containerIdStr,
-      AggregatedLogFormat.LogReader reader, PrintStream out,
-      long logUploadedTime, List<String> logType, long bytes)
-      throws IOException {
-    DataInputStream valueStream = getContainerLogsStream(
-        containerIdStr, reader);
-    if (valueStream == null) {
-      return -1;
-    }
-
-    boolean foundContainerLogs = false;
-    while (true) {
-      try {
-        int result = LogReader.readContainerLogsForALogType(
-            valueStream, out, logUploadedTime, logType, bytes);
-        if (result == 0) {
-          foundContainerLogs = true;
-        }
-      } catch (EOFException eof) {
-        break;
-      }
-    }
-
-    if (foundContainerLogs) {
-      return 0;
-    }
-    return -1;
-  }
-
-  @Private
   public int dumpAllContainersLogs(ContainerLogsRequest options)
       throws IOException {
-    ApplicationId appId = options.getAppId();
-    String appOwner = options.getAppOwner();
-    String localDir = options.getOutputLocalDir();
-    List<String> logTypes = new ArrayList<String>(options.getLogTypes());
-    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
-        appId, appOwner);
-    if (nodeFiles == null) {
-      return -1;
-    }
-    boolean foundAnyLogs = false;
-    while (nodeFiles.hasNext()) {
-      FileStatus thisNodeFile = nodeFiles.next();
-      if (thisNodeFile.getPath().getName().equals(appId + ".har")) {
-        Path p = new Path("har:///"
-            + thisNodeFile.getPath().toUri().getRawPath());
-        nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
-        continue;
-      }
-      if (!thisNodeFile.getPath().getName()
-          .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
-        AggregatedLogFormat.LogReader reader =
-            new AggregatedLogFormat.LogReader(getConf(),
-                thisNodeFile.getPath());
-        try {
-
-          DataInputStream valueStream;
-          LogKey key = new LogKey();
-          valueStream = reader.next(key);
-
-          while (valueStream != null) {
-            PrintStream out = createPrintStream(localDir,
-                thisNodeFile.getPath().getName(), key.toString());
-            try {
-              String containerString = String.format(
-                  CONTAINER_ON_NODE_PATTERN, key,
-                  thisNodeFile.getPath().getName());
-              out.println(containerString);
-              out.println("LogAggregationType: AGGREGATED");
-              out.println(StringUtils.repeat("=", containerString.length()));
-              while (true) {
-                try {
-                  if (logTypes == null || logTypes.isEmpty()) {
-                    LogReader.readAContainerLogsForALogType(valueStream, out,
-                        thisNodeFile.getModificationTime(),
-                        options.getBytes());
-                    foundAnyLogs = true;
-                  } else {
-                    int result = LogReader.readContainerLogsForALogType(
-                        valueStream, out, thisNodeFile.getModificationTime(),
-                        logTypes, options.getBytes());
-                    if (result == 0) {
-                      foundAnyLogs = true;
-                    }
-                  }
-                } catch (EOFException eof) {
-                  break;
-                }
-              }
-            } finally {
-              closePrintStream(out);
-            }
-
-            // Next container
-            key = new LogKey();
-            valueStream = reader.next(key);
-          }
-        } finally {
-          reader.close();
-        }
-      }
-    }
+    boolean foundAnyLogs = getFileController(options.getAppId(),
+        options.getAppOwner()).readAggregatedLogs(
+        options, null);
     if (!foundAnyLogs) {
-      emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner)
+      emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(
+          conf, options.getAppId(), options.getAppOwner())
           .toString());
       return -1;
     }
@@ -425,14 +171,13 @@ public class LogCLIHelpers implements Configurable {
   public int printAContainerLogMetadata(ContainerLogsRequest options,
       PrintStream out, PrintStream err)
       throws IOException {
-    ApplicationId appId = options.getAppId();
-    String appOwner = options.getAppOwner();
     String nodeId = options.getNodeId();
     String containerIdStr = options.getContainerId();
     List<ContainerLogMeta> containersLogMeta;
     try {
-      containersLogMeta = LogToolUtils.getContainerLogMetaFromRemoteFS(
-          conf, appId, containerIdStr, nodeId, appOwner);
+      containersLogMeta = getFileController(options.getAppId(),
+          options.getAppOwner()).readAggregatedLogsMeta(
+          options);
     } catch (Exception ex) {
       err.println(ex.getMessage());
       return -1;
@@ -473,8 +218,26 @@ public class LogCLIHelpers implements Configurable {
       PrintStream out, PrintStream err) throws IOException {
     ApplicationId appId = options.getAppId();
     String appOwner = options.getAppOwner();
-    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
-        appId, appOwner);
+    LogAggregationFileController fileFormat = null;
+    try {
+      fileFormat = getFileController(appId, appOwner);
+    } catch (Exception ex) {
+      err.println(ex.getMessage());
+      return;
+    }
+    RemoteIterator<FileStatus> nodeFiles = null;
+    try {
+      nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(conf, appId,
+          appOwner, fileFormat.getRemoteRootLogDir(),
+          fileFormat.getRemoteRootLogDirSuffix());
+    } catch (FileNotFoundException fnf) {
+      logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
+          conf, appId, appOwner).toString());
+    } catch (AccessControlException | AccessDeniedException ace) {
+      logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
+          conf, appId, appOwner).toString(), appOwner,
+          ace.getMessage());
+    }
     if (nodeFiles == null) {
       return;
     }
@@ -497,44 +260,21 @@ public class LogCLIHelpers implements Configurable {
   public void printContainersList(ContainerLogsRequest options,
       PrintStream out, PrintStream err) throws IOException {
     ApplicationId appId = options.getAppId();
-    String appOwner = options.getAppOwner();
     String nodeId = options.getNodeId();
-    String nodeIdStr = (nodeId == null) ? null
-        : LogAggregationUtils.getNodeString(nodeId);
-    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
-        appId, appOwner);
-    if (nodeFiles == null) {
-      return;
-    }
     boolean foundAnyLogs = false;
-    while (nodeFiles.hasNext()) {
-      FileStatus thisNodeFile = nodeFiles.next();
-      if (nodeIdStr != null) {
-        if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
-          continue;
-        }
-      }
-      if (!thisNodeFile.getPath().getName()
-          .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
-        AggregatedLogFormat.LogReader reader =
-            new AggregatedLogFormat.LogReader(getConf(),
-            thisNodeFile.getPath());
-        try {
-          DataInputStream valueStream;
-          LogKey key = new LogKey();
-          valueStream = reader.next(key);
-          while (valueStream != null) {
-            out.println(String.format(CONTAINER_ON_NODE_PATTERN, key,
-                thisNodeFile.getPath().getName()));
-            foundAnyLogs = true;
-            // Next container
-            key = new LogKey();
-            valueStream = reader.next(key);
-          }
-        } finally {
-          reader.close();
-        }
-      }
+    List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
+    try {
+      containersLogMeta = getFileController(options.getAppId(),
+          options.getAppOwner()).readAggregatedLogsMeta(
+          options);
+    } catch (Exception ex) {
+      err.println(ex.getMessage());
+    }
+    for(ContainerLogMeta logMeta : containersLogMeta) {
+      out.println(String.format(CONTAINER_ON_NODE_PATTERN,
+          logMeta.getContainerId(),
+          logMeta.getNodeId()));
+      foundAnyLogs = true;
     }
     if (!foundAnyLogs) {
       if (nodeId != null) {
@@ -547,23 +287,6 @@ public class LogCLIHelpers implements Configurable {
     }
   }
 
-  private RemoteIterator<FileStatus> getRemoteNodeFileDir(ApplicationId appId,
-      String appOwner) throws IOException {
-    RemoteIterator<FileStatus> nodeFiles = null;
-    try {
-      nodeFiles = LogAggregationUtils.getRemoteNodeFileDir(
-          conf, appId, appOwner);
-    } catch (FileNotFoundException fnf) {
-      logDirNotExist(LogAggregationUtils.getRemoteAppLogDir(
-          conf, appId, appOwner).toString());
-    } catch (AccessControlException | AccessDeniedException ace) {
-      logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir(
-        conf, appId, appOwner).toString(), appOwner,
-        ace.getMessage());
-    }
-    return nodeFiles;
-  }
-
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
@@ -620,59 +343,29 @@ public class LogCLIHelpers implements Configurable {
   @Private
   public Set<String> listContainerLogs(ContainerLogsRequest options)
       throws IOException {
+    List<ContainerLogMeta> containersLogMeta;
     Set<String> logTypes = new HashSet<String>();
-    ApplicationId appId = options.getAppId();
-    String appOwner = options.getAppOwner();
-    String nodeId = options.getNodeId();
-    String containerIdStr = options.getContainerId();
-    boolean getAllContainers = (containerIdStr == null);
-    String nodeIdStr = (nodeId == null) ? null
-        : LogAggregationUtils.getNodeString(nodeId);
-    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
-        appId, appOwner);
-    if (nodeFiles == null) {
+    try {
+      containersLogMeta = getFileController(options.getAppId(),
+          options.getAppOwner()).readAggregatedLogsMeta(
+          options);
+    } catch (Exception ex) {
+      System.err.println(ex.getMessage());
       return logTypes;
     }
-    while (nodeFiles.hasNext()) {
-      FileStatus thisNodeFile = nodeFiles.next();
-      if (nodeIdStr != null) {
-        if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
-          continue;
-        }
-      }
-      if (!thisNodeFile.getPath().getName()
-          .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
-        AggregatedLogFormat.LogReader reader =
-            new AggregatedLogFormat.LogReader(getConf(),
-            thisNodeFile.getPath());
-        try {
-          DataInputStream valueStream;
-          LogKey key = new LogKey();
-          valueStream = reader.next(key);
-          while (valueStream != null) {
-            if (getAllContainers || (key.toString().equals(containerIdStr))) {
-              while (true) {
-                try {
-                  String logFile = LogReader.readContainerMetaDataAndSkipData(
-                      valueStream).getFirst();
-                  logTypes.add(logFile);
-                } catch (EOFException eof) {
-                  break;
-                }
-              }
-              if (!getAllContainers) {
-                break;
-              }
-            }
-            // Next container
-            key = new LogKey();
-            valueStream = reader.next(key);
-          }
-        } finally {
-          reader.close();
-        }
+    for (ContainerLogMeta logMeta: containersLogMeta) {
+      for (PerContainerLogFileInfo fileInfo : logMeta.getContainerLogMeta()) {
+        logTypes.add(fileInfo.getFileName());
       }
     }
     return logTypes;
   }
+
+  private LogAggregationFileController getFileController(ApplicationId appId,
+      String appOwner) throws IOException {
+    if (factory == null) {
+      factory = new LogAggregationFileControllerFactory(conf);
+    }
+    return factory.getFileControllerForRead(appId, appOwner);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
index d553f86..0416566 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogToolUtils.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.yarn.logaggregation;
 
-import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -27,19 +25,7 @@ import java.nio.channels.Channels;
 import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.math3.util.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.HarFs;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
-import org.apache.hadoop.yarn.util.Times;
 
 /**
  * This class contains several utility function which could be used in different
@@ -54,81 +40,6 @@ public final class LogToolUtils {
   private LogToolUtils() {}
 
   /**
-   * Return a list of {@link ContainerLogMeta} for a container
-   * from Remote FileSystem.
-   *
-   * @param conf the configuration
-   * @param appId the applicationId
-   * @param containerIdStr the containerId
-   * @param nodeId the nodeId
-   * @param appOwner the application owner
-   * @return a list of {@link ContainerLogMeta}
-   * @throws IOException if there is no available log file
-   */
-  public static List<ContainerLogMeta> getContainerLogMetaFromRemoteFS(
-      Configuration conf, ApplicationId appId, String containerIdStr,
-      String nodeId, String appOwner) throws IOException {
-    List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
-    boolean getAllContainers = (containerIdStr == null);
-    String nodeIdStr = (nodeId == null) ? null
-        : LogAggregationUtils.getNodeString(nodeId);
-    RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
-        .getRemoteNodeFileDir(conf, appId, appOwner);
-    if (nodeFiles == null) {
-      throw new IOException("There is no available log fils for "
-          + "application:" + appId);
-    }
-    while (nodeFiles.hasNext()) {
-      FileStatus thisNodeFile = nodeFiles.next();
-      if (nodeIdStr != null) {
-        if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
-          continue;
-        }
-      }
-      if (!thisNodeFile.getPath().getName()
-          .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
-        AggregatedLogFormat.LogReader reader =
-            new AggregatedLogFormat.LogReader(conf,
-            thisNodeFile.getPath());
-        try {
-          DataInputStream valueStream;
-          LogKey key = new LogKey();
-          valueStream = reader.next(key);
-          while (valueStream != null) {
-            if (getAllContainers || (key.toString().equals(containerIdStr))) {
-              ContainerLogMeta containerLogMeta = new ContainerLogMeta(
-                  key.toString(), thisNodeFile.getPath().getName());
-              while (true) {
-                try {
-                  Pair<String, String> logMeta =
-                      LogReader.readContainerMetaDataAndSkipData(
-                          valueStream);
-                  containerLogMeta.addLogMeta(
-                      logMeta.getFirst(),
-                      logMeta.getSecond(),
-                      Times.format(thisNodeFile.getModificationTime()));
-                } catch (EOFException eof) {
-                  break;
-                }
-              }
-              containersLogMeta.add(containerLogMeta);
-              if (!getAllContainers) {
-                break;
-              }
-            }
-            // Next container
-            key = new LogKey();
-            valueStream = reader.next(key);
-          }
-        } finally {
-          reader.close();
-        }
-      }
-    }
-    return containersLogMeta;
-  }
-
-  /**
    * Output container log.
    * @param containerId the containerId
    * @param nodeId the nodeId
@@ -247,82 +158,4 @@ public final class LogToolUtils {
     }
   }
 
-  public static boolean outputAggregatedContainerLog(Configuration conf,
-      ApplicationId appId, String appOwner,
-      String containerId, String nodeId,
-      String logFileName, long outputSize, OutputStream os,
-      byte[] buf) throws IOException {
-    boolean findLogs = false;
-    RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
-        .getRemoteNodeFileDir(conf, appId, appOwner);
-    while (nodeFiles != null && nodeFiles.hasNext()) {
-      final FileStatus thisNodeFile = nodeFiles.next();
-      String nodeName = thisNodeFile.getPath().getName();
-      if (nodeName.equals(appId + ".har")) {
-        Path p = new Path("har:///"
-            + thisNodeFile.getPath().toUri().getRawPath());
-        nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
-        continue;
-      }
-      if ((nodeId == null || nodeName.contains(LogAggregationUtils
-          .getNodeString(nodeId))) && !nodeName.endsWith(
-              LogAggregationUtils.TMP_FILE_SUFFIX)) {
-        AggregatedLogFormat.LogReader reader = null;
-        try {
-          reader = new AggregatedLogFormat.LogReader(conf,
-              thisNodeFile.getPath());
-          DataInputStream valueStream;
-          LogKey key = new LogKey();
-          valueStream = reader.next(key);
-          while (valueStream != null && !key.toString()
-              .equals(containerId)) {
-            // Next container
-            key = new LogKey();
-            valueStream = reader.next(key);
-          }
-          if (valueStream == null) {
-            continue;
-          }
-          while (true) {
-            try {
-              String fileType = valueStream.readUTF();
-              String fileLengthStr = valueStream.readUTF();
-              long fileLength = Long.parseLong(fileLengthStr);
-              if (fileType.equalsIgnoreCase(logFileName)) {
-                LogToolUtils.outputContainerLog(containerId,
-                    nodeId, fileType, fileLength, outputSize,
-                    Times.format(thisNodeFile.getModificationTime()),
-                    valueStream, os, buf,
-                    ContainerLogAggregationType.AGGREGATED);
-                StringBuilder sb = new StringBuilder();
-                String endOfFile = "End of LogType:" + fileType;
-                sb.append("\n" + endOfFile + "\n");
-                sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
-                    + "\n\n");
-                byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
-                os.write(b, 0, b.length);
-                findLogs = true;
-              } else {
-                long totalSkipped = 0;
-                long currSkipped = 0;
-                while (currSkipped != -1 && totalSkipped < fileLength) {
-                  currSkipped = valueStream.skip(
-                      fileLength - totalSkipped);
-                  totalSkipped += currSkipped;
-                }
-              }
-            } catch (EOFException eof) {
-              break;
-            }
-          }
-        } finally {
-          if (reader != null) {
-            reader.close();
-          }
-        }
-      }
-    }
-    os.flush();
-    return findLogs;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index 5503f8f..87344a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -24,6 +24,10 @@ import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -31,7 +35,9 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -42,13 +48,18 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.webapp.View.ViewContext;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
 
 /**
  * Base class to implement Log Aggregation File Controller.
@@ -167,6 +178,74 @@ public abstract class LogAggregationFileController {
   public abstract void postWrite(LogAggregationFileControllerContext record)
       throws Exception;
 
+  protected PrintStream createPrintStream(String localDir, String nodeId,
+      String containerId) throws IOException {
+    PrintStream out = System.out;
+    if(localDir != null && !localDir.isEmpty()) {
+      Path nodePath = new Path(localDir, LogAggregationUtils
+          .getNodeString(nodeId));
+      Files.createDirectories(Paths.get(nodePath.toString()));
+      Path containerLogPath = new Path(nodePath, containerId);
+      out = new PrintStream(containerLogPath.toString(), "UTF-8");
+    }
+    return out;
+  }
+
+  protected void closePrintStream(OutputStream out) {
+    if (out != System.out) {
+      IOUtils.closeQuietly(out);
+    }
+  }
+
+  /**
+   * Output container log.
+   * @param logRequest {@link ContainerLogsRequest}
+   * @param os the output stream
+   * @throws IOException if we can not access the log file.
+   */
+  public abstract boolean readAggregatedLogs(ContainerLogsRequest logRequest,
+      OutputStream os) throws IOException;
+
+  /**
+   * Return a list of {@link ContainerLogMeta} for an application
+   * from Remote FileSystem.
+   *
+   * @param logRequest {@link ContainerLogsRequest}
+   * @return a list of {@link ContainerLogMeta}
+   * @throws IOException if there is no available log file
+   */
+  public abstract List<ContainerLogMeta> readAggregatedLogsMeta(
+      ContainerLogsRequest logRequest) throws IOException;
+
+  /**
+   * Render Aggregated Logs block.
+   * @param html the html
+   * @param context the ViewContext
+   */
+  public abstract void renderAggregatedLogsBlock(Block html,
+      ViewContext context);
+
+  /**
+   * Returns the owner of the application.
+   *
+   * @param the aggregatedLog path.
+   * @return the application owner.
+   * @throws IOException
+   */
+  public abstract String getApplicationOwner(Path aggregatedLogPath)
+      throws IOException;
+
+  /**
+   * Returns ACLs for the application. An empty map is returned if no ACLs are
+   * found.
+   *
+   * @param the aggregatedLog path.
+   * @return a map of the Application ACLs.
+   * @throws IOException
+   */
+  public abstract Map<ApplicationAccessType, String> getApplicationAcls(
+      Path aggregatedLogPath) throws IOException;
+
   /**
    * Verify and create the remote log directory.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
new file mode 100644
index 0000000..eb68028
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationHtmlBlock.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller;
+
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
+
+import com.google.inject.Inject;
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
+
+/**
+ * Base class to implement Aggregated Logs Block.
+ */
+@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
+public abstract class LogAggregationHtmlBlock extends HtmlBlock {
+
+  @Inject
+  public LogAggregationHtmlBlock(ViewContext ctx) {
+    super(ctx);
+  }
+
+  protected BlockParameters verifyAndParseParameters(Block html) {
+    BlockParameters params = new BlockParameters();
+    ContainerId containerId = LogAggregationWebUtils
+        .verifyAndGetContainerId(html, $(CONTAINER_ID));
+    params.setContainerId(containerId);
+
+    NodeId nodeId = LogAggregationWebUtils
+        .verifyAndGetNodeId(html, $(NM_NODENAME));
+    params.setNodeId(nodeId);
+
+    String appOwner = LogAggregationWebUtils
+        .verifyAndGetAppOwner(html, $(APP_OWNER));
+    params.setAppOwner(appOwner);
+
+    boolean isValid = true;
+    long start = -4096;
+    try {
+      start = LogAggregationWebUtils.getLogStartIndex(
+          html, $("start"));
+    } catch (NumberFormatException ne) {
+      html.h1()._("Invalid log start value: " + $("start"))._();
+      isValid = false;
+    }
+    params.setStartIndex(start);
+
+    long end = Long.MAX_VALUE;
+    try {
+      end = LogAggregationWebUtils.getLogEndIndex(
+          html, $("end"));
+    } catch (NumberFormatException ne) {
+      html.h1()._("Invalid log start value: " + $("end"))._();
+      isValid = false;
+    }
+    params.setEndIndex(end);
+
+    if (containerId == null || nodeId == null || appOwner == null
+        || appOwner.isEmpty() || !isValid) {
+      return null;
+    }
+
+    ApplicationId appId = containerId.getApplicationAttemptId()
+        .getApplicationId();
+    params.setAppId(appId);
+
+    String logEntity = $(ENTITY_STRING);
+    if (logEntity == null || logEntity.isEmpty()) {
+      logEntity = containerId.toString();
+    }
+    params.setLogEntity(logEntity);
+
+    return params;
+  }
+
+  protected boolean checkAcls(Configuration conf, ApplicationId appId,
+      String owner, Map<ApplicationAccessType, String> appAcls,
+      String remoteUser) {
+    ApplicationACLsManager aclsManager = new ApplicationACLsManager(
+        conf);
+    aclsManager.addApplication(appId, appAcls);
+
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    if (callerUGI != null && !aclsManager.checkAccess(callerUGI,
+        ApplicationAccessType.VIEW_APP, owner, appId)) {
+      return false;
+    }
+    return true;
+  }
+
+  protected static class BlockParameters {
+    private ApplicationId appId;
+    private ContainerId containerId;
+    private NodeId nodeId;
+    private String appOwner;
+    private long start;
+    private long end;
+    private String logEntity;
+
+    public ApplicationId getAppId() {
+      return appId;
+    }
+
+    public void setAppId(ApplicationId appId) {
+      this.appId = appId;
+    }
+
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+
+    public void setContainerId(ContainerId containerId) {
+      this.containerId = containerId;
+    }
+
+    public NodeId getNodeId() {
+      return nodeId;
+    }
+
+    public void setNodeId(NodeId nodeId) {
+      this.nodeId = nodeId;
+    }
+
+    public String getAppOwner() {
+      return appOwner;
+    }
+
+    public void setAppOwner(String appOwner) {
+      this.appOwner = appOwner;
+    }
+
+    public long getStartIndex() {
+      return start;
+    }
+
+    public void setStartIndex(long startIndex) {
+      this.start = startIndex;
+    }
+
+    public long getEndIndex() {
+      return end;
+    }
+
+    public void setEndIndex(long endIndex) {
+      this.end = endIndex;
+    }
+
+    public String getLogEntity() {
+      return logEntity;
+    }
+
+    public void setLogEntity(String logEntity) {
+      this.logEntity = logEntity;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
deleted file mode 100644
index 9e0c66d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationTFileController.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.logaggregation.filecontroller;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
-import org.apache.hadoop.yarn.util.Times;
-
-/**
- * The TFile log aggregation file Controller implementation.
- */
-@Private
-@Unstable
-public class LogAggregationTFileController
-    extends LogAggregationFileController {
-
-  private static final Log LOG = LogFactory.getLog(
-      LogAggregationTFileController.class);
-
-  private LogWriter writer;
-
-  public LogAggregationTFileController(){}
-
-  @Override
-  public void initInternal(Configuration conf) {
-    this.remoteRootLogDir = new Path(
-        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
-    this.remoteRootLogDirSuffix =
-        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
-            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
-  }
-
-  @Override
-  public void initializeWriter(LogAggregationFileControllerContext context)
-      throws IOException {
-    this.writer = new LogWriter();
-    writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
-        context.getUserUgi());
-    // Write ACLs once when the writer is created.
-    writer.writeApplicationACLs(context.getAppAcls());
-    writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
-  }
-
-  @Override
-  public void closeWriter() {
-    this.writer.close();
-  }
-
-  @Override
-  public void write(LogKey logKey, LogValue logValue) throws IOException {
-    this.writer.append(logKey, logValue);
-  }
-
-  @Override
-  public void postWrite(final LogAggregationFileControllerContext record)
-      throws Exception {
-    // Before upload logs, make sure the number of existing logs
-    // is smaller than the configured NM log aggregation retention size.
-    if (record.isUploadedLogsInThisCycle() &&
-        record.isLogAggregationInRolling()) {
-      cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
-          record.getUserUgi());
-      record.increcleanupOldLogTimes();
-    }
-
-    final Path renamedPath = record.getRollingMonitorInterval() <= 0
-        ? record.getRemoteNodeLogFileForApp() : new Path(
-            record.getRemoteNodeLogFileForApp().getParent(),
-            record.getRemoteNodeLogFileForApp().getName() + "_"
-            + record.getLogUploadTimeStamp());
-    final boolean rename = record.isUploadedLogsInThisCycle();
-    try {
-      record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
-        @Override
-        public Object run() throws Exception {
-          FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
-              .getFileSystem(conf);
-          if (rename) {
-            remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
-                renamedPath);
-          } else {
-            remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
-          }
-          return null;
-        }
-      });
-    } catch (Exception e) {
-      LOG.error(
-          "Failed to move temporary log file to final location: ["
-          + record.getRemoteNodeTmpLogFileForApp() + "] to ["
-          + renamedPath + "]", e);
-      throw new Exception("Log uploaded failed for Application: "
-          + record.getAppId() + " in NodeManager: "
-          + LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
-          + Times.format(record.getLogUploadTimeStamp()) + "\n");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
new file mode 100644
index 0000000..d2038e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.math3.util.Pair;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.View.ViewContext;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
+
+/**
+ * The TFile log aggregation file Controller implementation.
+ */
+@Private
+@Unstable
+public class LogAggregationTFileController
+    extends LogAggregationFileController {
+
+  private static final Log LOG = LogFactory.getLog(
+      LogAggregationTFileController.class);
+
+  private LogWriter writer;
+  private TFileLogReader tfReader = null;
+
+  public LogAggregationTFileController(){}
+
+  @Override
+  public void initInternal(Configuration conf) {
+    this.remoteRootLogDir = new Path(
+        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    this.remoteRootLogDirSuffix =
+        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
+  }
+
+  @Override
+  public void initializeWriter(LogAggregationFileControllerContext context)
+      throws IOException {
+    this.writer = new LogWriter();
+    writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
+        context.getUserUgi());
+    // Write ACLs once when the writer is created.
+    writer.writeApplicationACLs(context.getAppAcls());
+    writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
+  }
+
+  @Override
+  public void closeWriter() {
+    this.writer.close();
+    this.writer = null;
+  }
+
+  @Override
+  public void write(LogKey logKey, LogValue logValue) throws IOException {
+    this.writer.append(logKey, logValue);
+  }
+
+  @Override
+  public void postWrite(final LogAggregationFileControllerContext record)
+      throws Exception {
+    // Before upload logs, make sure the number of existing logs
+    // is smaller than the configured NM log aggregation retention size.
+    if (record.isUploadedLogsInThisCycle() &&
+        record.isLogAggregationInRolling()) {
+      cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
+          record.getUserUgi());
+      record.increcleanupOldLogTimes();
+    }
+
+    final Path renamedPath = record.getRollingMonitorInterval() <= 0
+        ? record.getRemoteNodeLogFileForApp() : new Path(
+            record.getRemoteNodeLogFileForApp().getParent(),
+            record.getRemoteNodeLogFileForApp().getName() + "_"
+            + record.getLogUploadTimeStamp());
+    final boolean rename = record.isUploadedLogsInThisCycle();
+    try {
+      record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws Exception {
+          FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
+              .getFileSystem(conf);
+          if (rename) {
+            remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
+                renamedPath);
+          } else {
+            remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
+          }
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error(
+          "Failed to move temporary log file to final location: ["
+          + record.getRemoteNodeTmpLogFileForApp() + "] to ["
+          + renamedPath + "]", e);
+      throw new Exception("Log uploaded failed for Application: "
+          + record.getAppId() + " in NodeManager: "
+          + LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
+          + Times.format(record.getLogUploadTimeStamp()) + "\n");
+    }
+  }
+
+  @Override
+  public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
+      OutputStream os) throws IOException {
+    boolean findLogs = false;
+    boolean createPrintStream = (os == null);
+    ApplicationId appId = logRequest.getAppId();
+    String nodeId = logRequest.getNodeId();
+    List<String> logTypes = new ArrayList<>();
+    if (logRequest.getLogTypes() != null && !logRequest
+        .getLogTypes().isEmpty()) {
+      logTypes.addAll(logRequest.getLogTypes());
+    }
+    String containerIdStr = logRequest.getContainerId();
+    boolean getAllContainers = (containerIdStr == null
+        || containerIdStr.isEmpty());
+    long size = logRequest.getBytes();
+    RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
+        .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner());
+    byte[] buf = new byte[65535];
+    while (nodeFiles != null && nodeFiles.hasNext()) {
+      final FileStatus thisNodeFile = nodeFiles.next();
+      LOG.error(thisNodeFile.getPath().toString());
+      String nodeName = thisNodeFile.getPath().getName();
+      if (nodeName.equals(appId + ".har")) {
+        Path p = new Path("har:///"
+            + thisNodeFile.getPath().toUri().getRawPath());
+        nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
+        continue;
+      }
+      if ((nodeId == null || nodeName.contains(LogAggregationUtils
+          .getNodeString(nodeId))) && !nodeName.endsWith(
+              LogAggregationUtils.TMP_FILE_SUFFIX)) {
+        AggregatedLogFormat.LogReader reader = null;
+        try {
+          reader = new AggregatedLogFormat.LogReader(conf,
+              thisNodeFile.getPath());
+          DataInputStream valueStream;
+          LogKey key = new LogKey();
+          valueStream = reader.next(key);
+          while (valueStream != null) {
+            if (getAllContainers || (key.toString().equals(containerIdStr))) {
+              if (createPrintStream) {
+                os = createPrintStream(
+                    logRequest.getOutputLocalDir(),
+                    thisNodeFile.getPath().getName(), key.toString());
+              }
+              try {
+                while (true) {
+                  try {
+                    String fileType = valueStream.readUTF();
+                    String fileLengthStr = valueStream.readUTF();
+                    long fileLength = Long.parseLong(fileLengthStr);
+                    if (logTypes == null || logTypes.isEmpty() ||
+                        logTypes.contains(fileType)) {
+                      LogToolUtils.outputContainerLog(key.toString(),
+                          nodeName, fileType, fileLength, size,
+                          Times.format(thisNodeFile.getModificationTime()),
+                          valueStream, os, buf,
+                          ContainerLogAggregationType.AGGREGATED);
+                      StringBuilder sb = new StringBuilder();
+                      String endOfFile = "End of LogType:" + fileType;
+                      sb.append("\n" + endOfFile + "\n");
+                      sb.append(StringUtils.repeat("*", endOfFile.length() + 50)
+                          + "\n\n");
+                      byte[] b = sb.toString().getBytes(
+                          Charset.forName("UTF-8"));
+                      os.write(b, 0, b.length);
+                      findLogs = true;
+                    } else {
+                      long totalSkipped = 0;
+                      long currSkipped = 0;
+                      while (currSkipped != -1 && totalSkipped < fileLength) {
+                        currSkipped = valueStream.skip(
+                            fileLength - totalSkipped);
+                        totalSkipped += currSkipped;
+                      }
+                    }
+                  } catch (EOFException eof) {
+                    break;
+                  }
+                }
+              } finally {
+                os.flush();
+                if (createPrintStream) {
+                  closePrintStream(os);
+                }
+              }
+              if (!getAllContainers) {
+                break;
+              }
+            }
+            // Next container
+            key = new LogKey();
+            valueStream = reader.next(key);
+          }
+        } finally {
+          if (reader != null) {
+            reader.close();
+          }
+        }
+      }
+    }
+    return findLogs;
+  }
+
+  @Override
+  public List<ContainerLogMeta> readAggregatedLogsMeta(
+      ContainerLogsRequest logRequest) throws IOException {
+    List<ContainerLogMeta> containersLogMeta = new ArrayList<>();
+    String containerIdStr = logRequest.getContainerId();
+    String nodeId = logRequest.getNodeId();
+    ApplicationId appId = logRequest.getAppId();
+    String appOwner = logRequest.getAppOwner();
+    boolean getAllContainers = (containerIdStr == null);
+    String nodeIdStr = (nodeId == null) ? null
+        : LogAggregationUtils.getNodeString(nodeId);
+    RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
+        .getRemoteNodeFileDir(conf, appId, appOwner);
+    if (nodeFiles == null) {
+      throw new IOException("There is no available log fils for "
+          + "application:" + appId);
+    }
+    while (nodeFiles.hasNext()) {
+      FileStatus thisNodeFile = nodeFiles.next();
+      if (nodeIdStr != null) {
+        if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
+          continue;
+        }
+      }
+      if (!thisNodeFile.getPath().getName()
+          .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
+        AggregatedLogFormat.LogReader reader =
+            new AggregatedLogFormat.LogReader(conf,
+            thisNodeFile.getPath());
+        try {
+          DataInputStream valueStream;
+          LogKey key = new LogKey();
+          valueStream = reader.next(key);
+          while (valueStream != null) {
+            if (getAllContainers || (key.toString().equals(containerIdStr))) {
+              ContainerLogMeta containerLogMeta = new ContainerLogMeta(
+                  key.toString(), thisNodeFile.getPath().getName());
+              while (true) {
+                try {
+                  Pair<String, String> logMeta =
+                      LogReader.readContainerMetaDataAndSkipData(
+                          valueStream);
+                  containerLogMeta.addLogMeta(
+                      logMeta.getFirst(),
+                      logMeta.getSecond(),
+                      Times.format(thisNodeFile.getModificationTime()));
+                } catch (EOFException eof) {
+                  break;
+                }
+              }
+              containersLogMeta.add(containerLogMeta);
+              if (!getAllContainers) {
+                break;
+              }
+            }
+            // Next container
+            key = new LogKey();
+            valueStream = reader.next(key);
+          }
+        } finally {
+          reader.close();
+        }
+      }
+    }
+    return containersLogMeta;
+  }
+
+  @Override
+  public void renderAggregatedLogsBlock(Block html, ViewContext context) {
+    TFileAggregatedLogsBlock block = new TFileAggregatedLogsBlock(
+        context, conf);
+    block.render(html);
+  }
+
+  @Override
+  public String getApplicationOwner(Path aggregatedLog) throws IOException {
+    createTFileLogReader(aggregatedLog);
+    return this.tfReader.getLogReader().getApplicationOwner();
+  }
+
+  @Override
+  public Map<ApplicationAccessType, String> getApplicationAcls(
+      Path aggregatedLog) throws IOException {
+    createTFileLogReader(aggregatedLog);
+    return this.tfReader.getLogReader().getApplicationAcls();
+  }
+
+  private void createTFileLogReader(Path aggregatedLog) throws IOException {
+    if (this.tfReader == null || !this.tfReader.getAggregatedLogPath()
+        .equals(aggregatedLog)) {
+      LogReader logReader = new LogReader(conf, aggregatedLog);
+      this.tfReader = new TFileLogReader(logReader, aggregatedLog);
+    }
+  }
+
+  private static class TFileLogReader {
+    private LogReader logReader;
+    private Path aggregatedLogPath;
+
+    TFileLogReader(LogReader logReader, Path aggregatedLogPath) {
+      this.setLogReader(logReader);
+      this.setAggregatedLogPath(aggregatedLogPath);
+    }
+    public LogReader getLogReader() {
+      return logReader;
+    }
+    public void setLogReader(LogReader logReader) {
+      this.logReader = logReader;
+    }
+    public Path getAggregatedLogPath() {
+      return aggregatedLogPath;
+    }
+    public void setAggregatedLogPath(Path aggregatedLogPath) {
+      this.aggregatedLogPath = aggregatedLogPath;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
new file mode 100644
index 0000000..496db25
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
+
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
+
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.HarFs;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationHtmlBlock;
+import org.apache.hadoop.yarn.util.Times;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
+
+/**
+ * The Aggregated Logs Block implementation for TFile.
+ */
+@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
+public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
+
+  private final Configuration conf;
+
+  @Inject
+  public TFileAggregatedLogsBlock(ViewContext ctx, Configuration conf) {
+    super(ctx);
+    this.conf = conf;
+  }
+
+  @Override
+  protected void render(Block html) {
+
+    BlockParameters params = verifyAndParseParameters(html);
+    if (params == null) {
+      return;
+    }
+
+    RemoteIterator<FileStatus> nodeFiles;
+    try {
+      nodeFiles = LogAggregationUtils
+          .getRemoteNodeFileDir(conf, params.getAppId(),
+              params.getAppOwner());
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (Exception ex) {
+      html.h1("No logs available for container "
+          + params.getContainerId().toString());
+      return;
+    }
+
+    NodeId nodeId = params.getNodeId();
+    String logEntity = params.getLogEntity();
+    ApplicationId appId = params.getAppId();
+    ContainerId containerId = params.getContainerId();
+    long start = params.getStartIndex();
+    long end = params.getEndIndex();
+
+    boolean foundLog = false;
+    String desiredLogType = $(CONTAINER_LOG_TYPE);
+    try {
+      while (nodeFiles.hasNext()) {
+        AggregatedLogFormat.LogReader reader = null;
+        try {
+          FileStatus thisNodeFile = nodeFiles.next();
+          if (thisNodeFile.getPath().getName().equals(
+              params.getAppId() + ".har")) {
+            Path p = new Path("har:///"
+                + thisNodeFile.getPath().toUri().getRawPath());
+            nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
+            continue;
+          }
+          if (!thisNodeFile.getPath().getName()
+              .contains(LogAggregationUtils.getNodeString(nodeId))
+              || thisNodeFile.getPath().getName()
+                  .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
+            continue;
+          }
+          long logUploadedTime = thisNodeFile.getModificationTime();
+          reader = new AggregatedLogFormat.LogReader(
+              conf, thisNodeFile.getPath());
+
+          String owner = null;
+          Map<ApplicationAccessType, String> appAcls = null;
+          try {
+            owner = reader.getApplicationOwner();
+            appAcls = reader.getApplicationAcls();
+          } catch (IOException e) {
+            LOG.error("Error getting logs for " + logEntity, e);
+            continue;
+          }
+          String remoteUser = request().getRemoteUser();
+
+          if (!checkAcls(conf, appId, owner, appAcls, remoteUser)) {
+            html.h1()._("User [" + remoteUser
+                + "] is not authorized to view the logs for " + logEntity
+                + " in log file [" + thisNodeFile.getPath().getName() + "]")
+                ._();
+            LOG.error("User [" + remoteUser
+                + "] is not authorized to view the logs for " + logEntity);
+            continue;
+          }
+
+          AggregatedLogFormat.ContainerLogsReader logReader = reader
+              .getContainerLogsReader(containerId);
+          if (logReader == null) {
+            continue;
+          }
+
+          foundLog = readContainerLogs(html, logReader, start, end,
+              desiredLogType, logUploadedTime);
+        } catch (IOException ex) {
+          LOG.error("Error getting logs for " + logEntity, ex);
+          continue;
+        } finally {
+          if (reader != null) {
+            reader.close();
+          }
+        }
+      }
+      if (!foundLog) {
+        if (desiredLogType.isEmpty()) {
+          html.h1("No logs available for container "
+              + containerId.toString());
+        } else {
+          html.h1("Unable to locate '" + desiredLogType
+              + "' log for container " + containerId.toString());
+        }
+      }
+    } catch (IOException e) {
+      html.h1()._("Error getting logs for " + logEntity)._();
+      LOG.error("Error getting logs for " + logEntity, e);
+    }
+  }
+
+  private boolean readContainerLogs(Block html,
+      AggregatedLogFormat.ContainerLogsReader logReader, long startIndex,
+      long endIndex, String desiredLogType, long logUpLoadTime)
+      throws IOException {
+    int bufferSize = 65536;
+    char[] cbuf = new char[bufferSize];
+
+    boolean foundLog = false;
+    String logType = logReader.nextLog();
+    while (logType != null) {
+      if (desiredLogType == null || desiredLogType.isEmpty()
+          || desiredLogType.equals(logType)) {
+        long logLength = logReader.getCurrentLogLength();
+        if (foundLog) {
+          html.pre()._("\n\n")._();
+        }
+
+        html.p()._("Log Type: " + logType)._();
+        html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._();
+        html.p()._("Log Length: " + Long.toString(logLength))._();
+
+        long start = startIndex < 0
+            ? logLength + startIndex : startIndex;
+        start = start < 0 ? 0 : start;
+        start = start > logLength ? logLength : start;
+        long end = endIndex < 0
+            ? logLength + endIndex : endIndex;
+        end = end < 0 ? 0 : end;
+        end = end > logLength ? logLength : end;
+        end = end < start ? start : end;
+
+        long toRead = end - start;
+        if (toRead < logLength) {
+          html.p()._("Showing " + toRead + " bytes of " + logLength
+            + " total. Click ").a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
+                $(ENTITY_STRING), $(APP_OWNER),
+                logType, "?start=0"), "here").
+            _(" for the full log.")._();
+        }
+
+        long totalSkipped = 0;
+        while (totalSkipped < start) {
+          long ret = logReader.skip(start - totalSkipped);
+          if (ret == 0) {
+            //Read one byte
+            int nextByte = logReader.read();
+            // Check if we have reached EOF
+            if (nextByte == -1) {
+              throw new IOException("Premature EOF from container log");
+            }
+            ret = 1;
+          }
+          totalSkipped += ret;
+        }
+
+        int len = 0;
+        int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+        PRE<Hamlet> pre = html.pre();
+
+        while (toRead > 0
+            && (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
+          pre._(new String(cbuf, 0, len));
+          toRead = toRead - len;
+          currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
+        }
+
+        pre._();
+        foundLog = true;
+      }
+
+      logType = logReader.nextLog();
+    }
+
+    return foundLog;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
new file mode 100644
index 0000000..b2e91ab
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.logaggregation.filecontroller.tfile;
+import org.apache.hadoop.classification.InterfaceAudience;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message