hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From junping...@apache.org
Subject [1/2] hadoop git commit: YARN-6877. Create an abstract log reader for extendability. Contributed by Xuan Gong.
Date Fri, 01 Sep 2017 10:02:51 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 2442a8d71 -> 119220b88


http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
index 1da6e23..1c0b990 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
@@ -20,34 +20,18 @@ package org.apache.hadoop.yarn.webapp.log;
 
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_OWNER;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_ID;
-import static org.apache.hadoop.yarn.webapp.YarnWebParams.CONTAINER_LOG_TYPE;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.ENTITY_STRING;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.NM_NODENAME;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.HarFs;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
-import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
-import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Times;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.PRE;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationWebUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 
 import com.google.inject.Inject;
@@ -56,20 +40,40 @@ import com.google.inject.Inject;
 public class AggregatedLogsBlock extends HtmlBlock {
 
   private final Configuration conf;
+  private final LogAggregationFileControllerFactory factory;
 
   @Inject
   AggregatedLogsBlock(Configuration conf) {
     this.conf = conf;
+    factory = new LogAggregationFileControllerFactory(conf);
   }
 
   @Override
   protected void render(Block html) {
-    ContainerId containerId = verifyAndGetContainerId(html);
-    NodeId nodeId = verifyAndGetNodeId(html);
-    String appOwner = verifyAndGetAppOwner(html);
-    LogLimits logLimits = verifyAndGetLogLimits(html);
+    ContainerId containerId = LogAggregationWebUtils
+        .verifyAndGetContainerId(html, $(CONTAINER_ID));
+    NodeId nodeId = LogAggregationWebUtils
+        .verifyAndGetNodeId(html, $(NM_NODENAME));
+    String appOwner = LogAggregationWebUtils
+        .verifyAndGetAppOwner(html, $(APP_OWNER));
+    boolean isValid = true;
+    try {
+      LogAggregationWebUtils.getLogStartIndex(
+          html, $("start"));
+    } catch (NumberFormatException ne) {
+      html.h1()._("Invalid log start value: " + $("start"))._();
+      isValid = false;
+    }
+    try {
+      LogAggregationWebUtils.getLogEndIndex(
+          html, $("end"));
+    } catch (NumberFormatException ne) {
+      html.h1()._("Invalid log start value: " + $("end"))._();
+      isValid = false;
+    }
+
     if (containerId == null || nodeId == null || appOwner == null
-        || appOwner.isEmpty() || logLimits == null) {
+        || appOwner.isEmpty() || !isValid) {
       return;
     }
 
@@ -94,21 +98,11 @@ public class AggregatedLogsBlock extends HtmlBlock {
       return;
     }
 
-    Path remoteRootLogDir = new Path(conf.get(
-        YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
-        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
-    Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
-        remoteRootLogDir, applicationId, appOwner,
-        LogAggregationUtils.getRemoteNodeLogDirSuffix(conf));
-    RemoteIterator<FileStatus> nodeFiles;
+    LogAggregationFileController fileController;
     try {
-      Path qualifiedLogDir =
-          FileContext.getFileContext(conf).makeQualified(
-            remoteAppDir);
-      nodeFiles =
-          FileContext.getFileContext(qualifiedLogDir.toUri(), conf)
-            .listStatus(remoteAppDir);
-    } catch (FileNotFoundException fnf) {
+      fileController = this.factory.getFileControllerForRead(
+          applicationId, appOwner);
+    } catch (Exception fnf) {
       html.h1()
           ._("Logs not available for " + logEntity
               + ". Aggregation may not be complete, "
@@ -119,250 +113,9 @@ public class AggregatedLogsBlock extends HtmlBlock {
             ._();
       }
       return;
-    } catch (Exception ex) {
-      html.h1()
-          ._("Error getting logs at " + nodeId)._();
-      return;
-    }
-
-    boolean foundLog = false;
-    String desiredLogType = $(CONTAINER_LOG_TYPE);
-    try {
-      while (nodeFiles.hasNext()) {
-        AggregatedLogFormat.LogReader reader = null;
-        try {
-          FileStatus thisNodeFile = nodeFiles.next();
-          if (thisNodeFile.getPath().getName().equals(applicationId + ".har")) {
-            Path p = new Path("har:///"
-                + thisNodeFile.getPath().toUri().getRawPath());
-            nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
-            continue;
-          }
-          if (!thisNodeFile.getPath().getName()
-            .contains(LogAggregationUtils.getNodeString(nodeId))
-              || thisNodeFile.getPath().getName()
-                .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
-            continue;
-          }
-          long logUploadedTime = thisNodeFile.getModificationTime();
-          reader =
-              new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
-
-          String owner = null;
-          Map<ApplicationAccessType, String> appAcls = null;
-          try {
-            owner = reader.getApplicationOwner();
-            appAcls = reader.getApplicationAcls();
-          } catch (IOException e) {
-            LOG.error("Error getting logs for " + logEntity, e);
-            continue;
-          }
-          ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
-          aclsManager.addApplication(applicationId, appAcls);
-
-          String remoteUser = request().getRemoteUser();
-          UserGroupInformation callerUGI = null;
-          if (remoteUser != null) {
-            callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
-          }
-          if (callerUGI != null && !aclsManager.checkAccess(callerUGI,
-              ApplicationAccessType.VIEW_APP, owner, applicationId)) {
-            html.h1()
-                ._("User [" + remoteUser
-                    + "] is not authorized to view the logs for " + logEntity
-                    + " in log file [" + thisNodeFile.getPath().getName() + "]")._();
-            LOG.error("User [" + remoteUser
-              + "] is not authorized to view the logs for " + logEntity);
-            continue;
-          }
-
-          AggregatedLogFormat.ContainerLogsReader logReader = reader
-            .getContainerLogsReader(containerId);
-          if (logReader == null) {
-            continue;
-          }
-
-          foundLog = readContainerLogs(html, logReader, logLimits,
-              desiredLogType, logUploadedTime);
-        } catch (IOException ex) {
-          LOG.error("Error getting logs for " + logEntity, ex);
-          continue;
-        } finally {
-          if (reader != null)
-            reader.close();
-        }
-      }
-      if (!foundLog) {
-        if (desiredLogType.isEmpty()) {
-          html.h1("No logs available for container " + containerId.toString());
-        } else {
-          html.h1("Unable to locate '" + desiredLogType
-              + "' log for container " + containerId.toString());
-        }
-      }
-    } catch (IOException e) {
-      html.h1()._("Error getting logs for " + logEntity)._();
-      LOG.error("Error getting logs for " + logEntity, e);
-    }
-  }
-
-  private boolean readContainerLogs(Block html,
-      AggregatedLogFormat.ContainerLogsReader logReader, LogLimits logLimits,
-      String desiredLogType, long logUpLoadTime) throws IOException {
-    int bufferSize = 65536;
-    char[] cbuf = new char[bufferSize];
-
-    boolean foundLog = false;
-    String logType = logReader.nextLog();
-    while (logType != null) {
-      if (desiredLogType == null || desiredLogType.isEmpty()
-          || desiredLogType.equals(logType)) {
-        long logLength = logReader.getCurrentLogLength();
-        if (foundLog) {
-          html.pre()._("\n\n")._();
-        }
-
-        html.p()._("Log Type: " + logType)._();
-        html.p()._("Log Upload Time: " + Times.format(logUpLoadTime))._();
-        html.p()._("Log Length: " + Long.toString(logLength))._();
-
-        long start = logLimits.start < 0
-            ? logLength + logLimits.start : logLimits.start;
-        start = start < 0 ? 0 : start;
-        start = start > logLength ? logLength : start;
-        long end = logLimits.end < 0
-            ? logLength + logLimits.end : logLimits.end;
-        end = end < 0 ? 0 : end;
-        end = end > logLength ? logLength : end;
-        end = end < start ? start : end;
-
-        long toRead = end - start;
-        if (toRead < logLength) {
-            html.p()._("Showing " + toRead + " bytes of " + logLength
-                + " total. Click ")
-                .a(url("logs", $(NM_NODENAME), $(CONTAINER_ID),
-                    $(ENTITY_STRING), $(APP_OWNER),
-                    logType, "?start=0"), "here").
-                    _(" for the full log.")._();
-        }
-
-        long totalSkipped = 0;
-        while (totalSkipped < start) {
-          long ret = logReader.skip(start - totalSkipped);
-          if (ret == 0) {
-            //Read one byte
-            int nextByte = logReader.read();
-            // Check if we have reached EOF
-            if (nextByte == -1) {
-              throw new IOException( "Premature EOF from container log");
-            }
-            ret = 1;
-          }
-          totalSkipped += ret;
-        }
-
-        int len = 0;
-        int currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
-        PRE<Hamlet> pre = html.pre();
-
-        while (toRead > 0
-            && (len = logReader.read(cbuf, 0, currentToRead)) > 0) {
-          pre._(new String(cbuf, 0, len));
-          toRead = toRead - len;
-          currentToRead = toRead > bufferSize ? bufferSize : (int) toRead;
-        }
-
-        pre._();
-        foundLog = true;
-      }
-
-      logType = logReader.nextLog();
-    }
-
-    return foundLog;
-  }
-
-  private ContainerId verifyAndGetContainerId(Block html) {
-    String containerIdStr = $(CONTAINER_ID);
-    if (containerIdStr == null || containerIdStr.isEmpty()) {
-      html.h1()._("Cannot get container logs without a ContainerId")._();
-      return null;
-    }
-    ContainerId containerId = null;
-    try {
-      containerId = ContainerId.fromString(containerIdStr);
-    } catch (IllegalArgumentException e) {
-      html.h1()
-          ._("Cannot get container logs for invalid containerId: "
-              + containerIdStr)._();
-      return null;
-    }
-    return containerId;
-  }
-
-  private NodeId verifyAndGetNodeId(Block html) {
-    String nodeIdStr = $(NM_NODENAME);
-    if (nodeIdStr == null || nodeIdStr.isEmpty()) {
-      html.h1()._("Cannot get container logs without a NodeId")._();
-      return null;
-    }
-    NodeId nodeId = null;
-    try {
-      nodeId = NodeId.fromString(nodeIdStr);
-    } catch (IllegalArgumentException e) {
-      html.h1()._("Cannot get container logs. Invalid nodeId: " + nodeIdStr)
-          ._();
-      return null;
-    }
-    return nodeId;
-  }
-  
-  private String verifyAndGetAppOwner(Block html) {
-    String appOwner = $(APP_OWNER);
-    if (appOwner == null || appOwner.isEmpty()) {
-      html.h1()._("Cannot get container logs without an app owner")._();
-    }
-    return appOwner;
-  }
-
-  private static class LogLimits {
-    long start;
-    long end;
-  }
-
-  private LogLimits verifyAndGetLogLimits(Block html) {
-    long start = -4096;
-    long end = Long.MAX_VALUE;
-    boolean isValid = true;
-
-    String startStr = $("start");
-    if (startStr != null && !startStr.isEmpty()) {
-      try {
-        start = Long.parseLong(startStr);
-      } catch (NumberFormatException e) {
-        isValid = false;
-        html.h1()._("Invalid log start value: " + startStr)._();
-      }
-    }
-
-    String endStr = $("end");
-    if (endStr != null && !endStr.isEmpty()) {
-      try {
-        end = Long.parseLong(endStr);
-      } catch (NumberFormatException e) {
-        isValid = false;
-        html.h1()._("Invalid log end value: " + endStr)._();
-      }
-    }
-
-    if (!isValid) {
-      return null;
     }
 
-    LogLimits limits = new LogLimits();
-    limits.start = start;
-    limits.end = end;
-    return limits;
+    fileController.renderAggregatedLogsBlock(html, this.context());
   }
 
   private String getApplicationLogURL(ApplicationId applicationId) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0728a54..5dceedc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1148,7 +1148,7 @@
   <property>
     <description>Class that supports TFile read and write operations.</description>
     <name>yarn.log-aggregation.file-controller.TFile.class</name>
-    <value>org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController</value>
+    <value>org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController</value>
   </property>
 
   <property>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
index 3dd7de3..bee34e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
@@ -48,7 +48,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.TFileAggregatedLogsBlock;
 import org.apache.hadoop.yarn.webapp.YarnWebParams;
+import org.apache.hadoop.yarn.webapp.View.ViewContext;
 import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest;
 import org.apache.hadoop.yarn.webapp.view.BlockForTest;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
@@ -56,6 +58,9 @@ import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest;
 import org.junit.Test;
 
 import static org.mockito.Mockito.*;
+
+import com.google.inject.Inject;
+
 import static org.junit.Assert.*;
 
 /**
@@ -77,12 +82,14 @@ public class TestAggregatedLogsBlock {
 
     writeLog(configuration, "owner");
 
-    AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
-        configuration, "owner", "container_0_0001_01_000001");
+
     ByteArrayOutputStream data = new ByteArrayOutputStream();
     PrintWriter printWriter = new PrintWriter(data);
     HtmlBlock html = new HtmlBlockForTest();
     HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
+    TFileAggregatedLogsBlockForTest aggregatedBlock
+        = getTFileAggregatedLogsBlockForTest(configuration, "owner",
+            "container_0_0001_01_000001", "localhost:1234");
     aggregatedBlock.render(block);
 
     block.getWriter().flush();
@@ -158,12 +165,13 @@ public class TestAggregatedLogsBlock {
 
     writeLog(configuration, "admin");
 
-    AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
-        configuration, "admin", "container_0_0001_01_000001");
     ByteArrayOutputStream data = new ByteArrayOutputStream();
     PrintWriter printWriter = new PrintWriter(data);
     HtmlBlock html = new HtmlBlockForTest();
     HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
+    TFileAggregatedLogsBlockForTest aggregatedBlock
+        = getTFileAggregatedLogsBlockForTest(configuration, "admin",
+            "container_0_0001_01_000001", "localhost:1234");
     aggregatedBlock.render(block);
 
     block.getWriter().flush();
@@ -191,13 +199,13 @@ public class TestAggregatedLogsBlock {
         "/application_1440536969523_0001.har";
     FileUtils.copyDirectory(new File(harUrl.getPath()), new File(path));
 
-    AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
-        configuration, "admin",
-        "container_1440536969523_0001_01_000001", "host1:1111");
     ByteArrayOutputStream data = new ByteArrayOutputStream();
     PrintWriter printWriter = new PrintWriter(data);
     HtmlBlock html = new HtmlBlockForTest();
     HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
+    TFileAggregatedLogsBlockForTest aggregatedBlock
+        = getTFileAggregatedLogsBlockForTest(configuration, "admin",
+            "container_1440536969523_0001_01_000001", "host1:1111");
     aggregatedBlock.render(block);
 
     block.getWriter().flush();
@@ -206,7 +214,7 @@ public class TestAggregatedLogsBlock {
     assertTrue(out.contains("Hello stdout"));
     assertTrue(out.contains("Hello syslog"));
 
-    aggregatedBlock = getAggregatedLogsBlockForTest(
+    aggregatedBlock = getTFileAggregatedLogsBlockForTest(
         configuration, "admin",
         "container_1440536969523_0001_01_000002", "host2:2222");
     data = new ByteArrayOutputStream();
@@ -237,12 +245,13 @@ public class TestAggregatedLogsBlock {
     }
     writeLog(configuration, "admin");
 
-    AggregatedLogsBlockForTest aggregatedBlock = getAggregatedLogsBlockForTest(
-        configuration, "admin", "container_0_0001_01_000001");
     ByteArrayOutputStream data = new ByteArrayOutputStream();
     PrintWriter printWriter = new PrintWriter(data);
     HtmlBlock html = new HtmlBlockForTest();
     HtmlBlock.Block block = new BlockForTest(html, printWriter, 10, false);
+    TFileAggregatedLogsBlockForTest aggregatedBlock
+        = getTFileAggregatedLogsBlockForTest(configuration, "admin",
+            "container_0_0001_01_000001", "localhost:1234");
     aggregatedBlock.render(block);
 
     block.getWriter().flush();
@@ -250,8 +259,7 @@ public class TestAggregatedLogsBlock {
     assertTrue(out.contains("No logs available for container container_0_0001_01_000001"));
 
   }
-  
-  
+
   private Configuration getConfiguration() {
     Configuration configuration = new YarnConfiguration();
     configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
@@ -267,6 +275,28 @@ public class TestAggregatedLogsBlock {
         "localhost:1234");
   }
 
+  private TFileAggregatedLogsBlockForTest getTFileAggregatedLogsBlockForTest(
+      Configuration configuration, String user, String containerId,
+      String nodeName) {
+    HttpServletRequest request = mock(HttpServletRequest.class);
+    when(request.getRemoteUser()).thenReturn(user);
+    ViewContext mockContext = mock(ViewContext.class);
+    TFileAggregatedLogsBlockForTest aggregatedBlock
+        = new TFileAggregatedLogsBlockForTest(mockContext,
+            configuration);
+    aggregatedBlock.setRequest(request);
+    aggregatedBlock.moreParams().put(YarnWebParams.CONTAINER_ID,
+        containerId);
+    aggregatedBlock.moreParams().put(YarnWebParams.NM_NODENAME,
+        nodeName);
+    aggregatedBlock.moreParams().put(YarnWebParams.APP_OWNER, user);
+    aggregatedBlock.moreParams().put("start", "");
+    aggregatedBlock.moreParams().put("end", "");
+    aggregatedBlock.moreParams().put(YarnWebParams.ENTITY_STRING, "entity");
+    return aggregatedBlock;
+  }
+
+
   private AggregatedLogsBlockForTest getAggregatedLogsBlockForTest(
       Configuration configuration, String user, String containerId,
       String nodeName) {
@@ -340,4 +370,32 @@ public class TestAggregatedLogsBlock {
     writer.close();
   }
 
+  private static class TFileAggregatedLogsBlockForTest
+      extends TFileAggregatedLogsBlock {
+
+    private Map<String, String> params = new HashMap<String, String>();
+    private HttpServletRequest request;
+
+    @Inject
+    TFileAggregatedLogsBlockForTest(ViewContext ctx, Configuration conf) {
+      super(ctx, conf);
+    }
+
+    public void render(Block html) {
+      super.render(html);
+    }
+
+    @Override
+    public Map<String, String> moreParams() {
+      return params;
+    }
+
+    public HttpServletRequest request() {
+      return request;
+    }
+
+    public  void setRequest(HttpServletRequest request) {
+      this.request = request;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
index 45f18c1..2d0864a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/TestLogAggregationFileControllerFactory.java
@@ -23,19 +23,27 @@ import static org.junit.Assert.*;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Writer;
 import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
-import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
+import org.apache.hadoop.yarn.webapp.View.ViewContext;
+import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block;
 import org.junit.Test;
 
 /**
@@ -167,5 +175,34 @@ public class TestLogAggregationFileControllerFactory {
         throws IOException {
       // Do Nothing
     }
+
+    @Override
+    public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
+        OutputStream os) throws IOException {
+      return false;
+    }
+
+    @Override
+    public List<ContainerLogMeta> readAggregatedLogsMeta(
+        ContainerLogsRequest logRequest) throws IOException {
+      return null;
+    }
+
+    @Override
+    public void renderAggregatedLogsBlock(Block html, ViewContext context) {
+      // DO NOTHING
+    }
+
+    @Override
+    public String getApplicationOwner(Path aggregatedLogPath)
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public Map<ApplicationAccessType, String> getApplicationAcls(
+        Path aggregatedLogPath) throws IOException {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
index 3ec750a..6a1f786 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
@@ -23,6 +23,7 @@ import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -53,8 +54,9 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
-import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
 import org.apache.hadoop.yarn.server.webapp.WebServices;
 import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
@@ -91,12 +93,14 @@ public class AHSWebServices extends WebServices {
   private static final Joiner JOINER = Joiner.on("");
   private static final Joiner DOT_JOINER = Joiner.on(". ");
   private final Configuration conf;
+  private final LogAggregationFileControllerFactory factory;
 
   @Inject
   public AHSWebServices(ApplicationBaseProtocol appBaseProt,
       Configuration conf) {
     super(appBaseProt);
     this.conf = conf;
+    this.factory = new LogAggregationFileControllerFactory(conf);
   }
 
   @GET
@@ -516,9 +520,17 @@ public class AHSWebServices extends WebServices {
       @Override
       public void write(OutputStream os) throws IOException,
           WebApplicationException {
-        byte[] buf = new byte[65535];
-        boolean findLogs = LogToolUtils.outputAggregatedContainerLog(conf,
-            appId, appOwner, containerIdStr, nodeId, logFile, bytes, os, buf);
+        ContainerLogsRequest request = new ContainerLogsRequest();
+        request.setAppId(appId);
+        request.setAppOwner(appOwner);
+        request.setContainerId(containerIdStr);
+        request.setBytes(bytes);
+        request.setNodeId(nodeId);
+        Set<String> logTypes = new HashSet<>();
+        logTypes.add(logFile);
+        request.setLogTypes(logTypes);
+        boolean findLogs = factory.getFileControllerForRead(appId, appOwner)
+            .readAggregatedLogs(request, os);
         if (!findLogs) {
           os.write(("Can not find logs for container:"
               + containerIdStr).getBytes(Charset.forName("UTF-8")));
@@ -549,9 +561,14 @@ public class AHSWebServices extends WebServices {
       final String nodeId, final String containerIdStr,
       boolean emptyLocalContainerLogMeta) {
     try {
-      List<ContainerLogMeta> containerLogMeta = LogToolUtils
-          .getContainerLogMetaFromRemoteFS(conf, appId, containerIdStr,
-              nodeId, appOwner);
+      ContainerLogsRequest request = new ContainerLogsRequest();
+      request.setAppId(appId);
+      request.setAppOwner(appOwner);
+      request.setContainerId(containerIdStr);
+      request.setNodeId(nodeId);
+      List<ContainerLogMeta> containerLogMeta = factory
+          .getFileControllerForRead(appId, appOwner)
+          .readAggregatedLogsMeta(request);
       if (containerLogMeta.isEmpty()) {
         throw new NotFoundException(
             "Can not get log meta for container: " + containerIdStr);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 51c63c4..4ac150a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
-import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
index 2fa3e74..60905d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
@@ -23,8 +23,10 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +56,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta;
+import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType;
 import org.apache.hadoop.yarn.logaggregation.LogToolUtils;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -89,6 +93,7 @@ public class NMWebServices {
   private static RecordFactory recordFactory = RecordFactoryProvider
       .getRecordFactory(null);
   private final String redirectWSUrl;
+  private final LogAggregationFileControllerFactory factory;
 
   private @javax.ws.rs.core.Context 
     HttpServletRequest request;
@@ -107,6 +112,8 @@ public class NMWebServices {
     this.webapp = webapp;
     this.redirectWSUrl = this.nmContext.getConf().get(
         YarnConfiguration.YARN_LOG_SERVER_WEBSERVICE_URL);
+    this.factory = new LogAggregationFileControllerFactory(
+        this.nmContext.getConf());
   }
 
   private void init() {
@@ -253,10 +260,14 @@ public class NMWebServices {
       Application app = this.nmContext.getApplications().get(appId);
       String appOwner = app == null ? null : app.getUser();
       try {
-        List<ContainerLogMeta> containerLogMeta = LogToolUtils
-            .getContainerLogMetaFromRemoteFS(this.nmContext.getConf(),
-                appId, containerIdStr,
-                this.nmContext.getNodeId().toString(), appOwner);
+        ContainerLogsRequest logRequest = new ContainerLogsRequest();
+        logRequest.setAppId(appId);
+        logRequest.setAppOwner(appOwner);
+        logRequest.setContainerId(containerIdStr);
+        logRequest.setNodeId(this.nmContext.getNodeId().toString());
+        List<ContainerLogMeta> containerLogMeta = factory
+            .getFileControllerForRead(appId, appOwner)
+            .readAggregatedLogsMeta(logRequest);
         if (!containerLogMeta.isEmpty()) {
           for (ContainerLogMeta logMeta : containerLogMeta) {
             containersLogsInfo.add(new ContainerLogsInfo(logMeta,
@@ -441,12 +452,17 @@ public class NMWebServices {
             Application app = nmContext.getApplications().get(appId);
             String appOwner = app == null ? null : app.getUser();
             try {
-              int bufferSize = 65536;
-              byte[] buf = new byte[bufferSize];
-              LogToolUtils.outputAggregatedContainerLog(nmContext.getConf(),
-                  appId, appOwner, containerId.toString(),
-                  nmContext.getNodeId().toString(), outputFileName, bytes,
-                  os, buf);
+              ContainerLogsRequest logRequest = new ContainerLogsRequest();
+              logRequest.setAppId(appId);
+              logRequest.setAppOwner(appOwner);
+              logRequest.setContainerId(containerId.toString());
+              logRequest.setNodeId(nmContext.getNodeId().toString());
+              logRequest.setBytes(bytes);
+              Set<String> logTypes = new HashSet<>();
+              logTypes.add(outputFileName);
+              logRequest.setLogTypes(logTypes);
+              factory.getFileControllerForRead(appId, appOwner)
+                  .readAggregatedLogs(logRequest, os);
             } catch (IOException ex) {
               // Something wrong when we try to access the aggregated log.
               if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index 45b1771..37ffd00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
-import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.Context;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/119220b8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 6e085ce..ec1278f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -105,7 +105,7 @@ import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
 import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
-import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message