hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject hadoop git commit: YARN-6288. Exceptions during aggregated log writes are mishandled. Contributed by Akira Ajisaka
Date Thu, 06 Apr 2017 21:26:09 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 832c2f52f -> 84b7f2e95


YARN-6288. Exceptions during aggregated log writes are mishandled. Contributed by Akira Ajisaka

(cherry picked from commit 1b081ca27e05e97d8b7d284ca24200d43763e481)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84b7f2e9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84b7f2e9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84b7f2e9

Branch: refs/heads/branch-2
Commit: 84b7f2e956d30f17b579c2ec4788b0911cb4d7fc
Parents: 832c2f5
Author: Jason Lowe <jlowe@yahoo-inc.com>
Authored: Thu Apr 6 16:24:36 2017 -0500
Committer: Jason Lowe <jlowe@yahoo-inc.com>
Committed: Thu Apr 6 16:25:41 2017 -0500

----------------------------------------------------------------------
 .../hadoop/yarn/client/cli/TestLogsCLI.java     |  58 ++++-----
 .../logaggregation/AggregatedLogFormat.java     |  30 +++--
 .../logaggregation/TestAggregatedLogFormat.java | 123 ++++++++++---------
 .../logaggregation/TestAggregatedLogsBlock.java |  25 ++--
 .../logaggregation/TestContainerLogsUtils.java  |  15 +--
 .../logaggregation/AppLogAggregatorImpl.java    |  70 +++++------
 .../TestAppLogAggregatorImpl.java               |  10 +-
 7 files changed, 170 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/84b7f2e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
index 22ab55c..31908b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java
@@ -1345,18 +1345,18 @@ public class TestLogsCLI {
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
             + System.currentTimeMillis());
-    AggregatedLogFormat.LogWriter writer =
-        new AggregatedLogFormat.LogWriter(configuration, path, ugi);
-    writer.writeApplicationOwner(ugi.getUserName());
-
-    Map<ApplicationAccessType, String> appAcls =
-        new HashMap<ApplicationAccessType, String>();
-    appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-    writer.writeApplicationACLs(appAcls);
-    writer.append(new AggregatedLogFormat.LogKey(containerId),
-      new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
-        UserGroupInformation.getCurrentUser().getShortUserName()));
-    writer.close();
+    try (AggregatedLogFormat.LogWriter writer =
+             new AggregatedLogFormat.LogWriter()) {
+      writer.initialize(configuration, path, ugi);
+      writer.writeApplicationOwner(ugi.getUserName());
+
+      Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+      appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+      writer.writeApplicationACLs(appAcls);
+      writer.append(new AggregatedLogFormat.LogKey(containerId),
+          new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+              UserGroupInformation.getCurrentUser().getShortUserName()));
+    }
   }
 
   private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
@@ -1365,23 +1365,23 @@ public class TestLogsCLI {
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
             + System.currentTimeMillis());
-    AggregatedLogFormat.LogWriter writer =
-        new AggregatedLogFormat.LogWriter(configuration, path, ugi);
-    writer.writeApplicationOwner(ugi.getUserName());
-
-    Map<ApplicationAccessType, String> appAcls =
-        new HashMap<ApplicationAccessType, String>();
-    appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-    writer.writeApplicationACLs(appAcls);
-    DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
-    new AggregatedLogFormat.LogKey(containerId).write(out);
-    out.close();
-    out = writer.getWriter().prepareAppendValue(-1);
-    new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
-      UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
-      new HashSet<File>());
-    out.close();
-    writer.close();
+    try (AggregatedLogFormat.LogWriter writer =
+             new AggregatedLogFormat.LogWriter()) {
+      writer.initialize(configuration, path, ugi);
+      writer.writeApplicationOwner(ugi.getUserName());
+
+      Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+      appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+      writer.writeApplicationACLs(appAcls);
+      DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
+      new AggregatedLogFormat.LogKey(containerId).write(out);
+      out.close();
+      out = writer.getWriter().prepareAppendValue(-1);
+      new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+          UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
+              new HashSet<>());
+      out.close();
+    }
   }
 
   private YarnClient createMockYarnClient(YarnApplicationState appState,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84b7f2e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
index eeb49e4..aa29386 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogFormat.java
@@ -444,14 +444,23 @@ public class AggregatedLogFormat {
    * The writer that writes out the aggregated logs.
    */
   @Private
-  public static class LogWriter {
+  public static class LogWriter implements AutoCloseable {
 
-    private final FSDataOutputStream fsDataOStream;
-    private final TFile.Writer writer;
+    private FSDataOutputStream fsDataOStream;
+    private TFile.Writer writer;
     private FileContext fc;
 
-    public LogWriter(final Configuration conf, final Path remoteAppLogFile,
-        UserGroupInformation userUgi) throws IOException {
+    /**
+     * Initialize the LogWriter.
+     * Must be called just after the instance is created.
+     * @param conf Configuration
+     * @param remoteAppLogFile remote log file path
+     * @param userUgi Ugi of the user
+     * @throws IOException Failed to initialize
+     */
+    public void initialize(final Configuration conf,
+                           final Path remoteAppLogFile,
+                           UserGroupInformation userUgi) throws IOException {
       try {
         this.fsDataOStream =
             userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@@ -529,11 +538,14 @@ public class AggregatedLogFormat {
       }
     }
 
+    @Override
     public void close() {
-      try {
-        this.writer.close();
-      } catch (IOException e) {
-        LOG.warn("Exception closing writer", e);
+      if (writer != null) {
+        try {
+          writer.close();
+        } catch (IOException e) {
+          LOG.warn("Exception closing writer", e);
+        }
       }
       IOUtils.closeStream(fsDataOStream);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84b7f2e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
index 8cbec10..efbaa4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogFormat.java
@@ -140,44 +140,44 @@ public class TestAggregatedLogFormat {
     final int ch = filler;
 
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    LogWriter logWriter = new LogWriter(new Configuration(), remoteAppLogFile,
-        ugi);
+    try (LogWriter logWriter = new LogWriter()) {
+      logWriter.initialize(new Configuration(), remoteAppLogFile, ugi);
 
-    LogKey logKey = new LogKey(testContainerId);
-    LogValue logValue =
-        spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
-            testContainerId, ugi.getShortUserName()));
+      LogKey logKey = new LogKey(testContainerId);
+      LogValue logValue =
+          spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
+              testContainerId, ugi.getShortUserName()));
 
-    final CountDownLatch latch = new CountDownLatch(1);
+      final CountDownLatch latch = new CountDownLatch(1);
 
-    Thread t = new Thread() {
-      public void run() {
-        try {
-          for(int i=0; i < length/3; i++) {
+      Thread t = new Thread() {
+        public void run() {
+          try {
+            for (int i = 0; i < length / 3; i++) {
               osw.write(ch);
-          }
+            }
 
-          latch.countDown();
+            latch.countDown();
 
-          for(int i=0; i < (2*length)/3; i++) {
-            osw.write(ch);
+            for (int i = 0; i < (2 * length) / 3; i++) {
+              osw.write(ch);
+            }
+            osw.close();
+          } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
           }
-          osw.close();
-        } catch (IOException e) {
-          // TODO Auto-generated catch block
-          e.printStackTrace();
         }
-      }
-    };
-    t.start();
+      };
+      t.start();
 
-    //Wait till the osw is partially written
-    //aggregation starts once the ows has completed 1/3rd of its work
-    latch.await();
+      //Wait till the osw is partially written
+      //aggregation starts once the ows has completed 1/3rd of its work
+      latch.await();
 
-    //Aggregate The Logs
-    logWriter.append(logKey, logValue);
-    logWriter.close();
+      //Aggregate The Logs
+      logWriter.append(logKey, logValue);
+    }
   }
 
   @Test
@@ -216,22 +216,23 @@ public class TestAggregatedLogFormat {
     writeSrcFile(srcFilePath, "stdout", numChars);
 
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
-
-    LogKey logKey = new LogKey(testContainerId);
-    LogValue logValue =
-        new LogValue(Collections.singletonList(srcFileRoot.toString()),
-            testContainerId, ugi.getShortUserName());
-
-    // When we try to open FileInputStream for stderr, it will throw out an IOException.
-    // Skip the log aggregation for stderr.
-    LogValue spyLogValue = spy(logValue);
-    File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
-    doThrow(new IOException("Mock can not open FileInputStream")).when(
-      spyLogValue).secureOpenFile(errorFile);
-
-    logWriter.append(logKey, spyLogValue);
-    logWriter.close();
+    try (LogWriter logWriter = new LogWriter()) {
+      logWriter.initialize(conf, remoteAppLogFile, ugi);
+
+      LogKey logKey = new LogKey(testContainerId);
+      LogValue logValue =
+          new LogValue(Collections.singletonList(srcFileRoot.toString()),
+              testContainerId, ugi.getShortUserName());
+
+      // When we try to open FileInputStream for stderr, it will throw out an
+      // IOException. Skip the log aggregation for stderr.
+      LogValue spyLogValue = spy(logValue);
+      File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
+      doThrow(new IOException("Mock can not open FileInputStream")).when(
+          spyLogValue).secureOpenFile(errorFile);
+
+      logWriter.append(logKey, spyLogValue);
+    }
 
     // make sure permission are correct on the file
     FileStatus fsStatus =  fs.getFileStatus(remoteAppLogFile);
@@ -311,24 +312,24 @@ public class TestAggregatedLogFormat {
 
     UserGroupInformation ugi =
         UserGroupInformation.getCurrentUser();
-    LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
+    try (LogWriter logWriter = new LogWriter()) {
+      logWriter.initialize(conf, remoteAppLogFile, ugi);
+
+      LogKey logKey = new LogKey(testContainerId1);
+      String randomUser = "randomUser";
+      LogValue logValue =
+          spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
+              testContainerId1, randomUser));
+
+      // It is trying simulate a situation where first log file is owned by
+      // different user (probably symlink) and second one by the user itself.
+      // The first file should not be aggregated. Because this log file has
+      // the invalid user name.
+      when(logValue.getUser()).thenReturn(randomUser).thenReturn(
+          ugi.getShortUserName());
+      logWriter.append(logKey, logValue);
+    }
 
-    LogKey logKey = new LogKey(testContainerId1);
-    String randomUser = "randomUser";
-    LogValue logValue =
-        spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
-            testContainerId1, randomUser));
-    
-    // It is trying simulate a situation where first log file is owned by
-    // different user (probably symlink) and second one by the user itself.
-    // The first file should not be aggregated. Because this log file has the invalid
-    // user name.
-    when(logValue.getUser()).thenReturn(randomUser).thenReturn(
-        ugi.getShortUserName());
-    logWriter.append(logKey, logValue);
-
-    logWriter.close();
-    
     BufferedReader in =
         new BufferedReader(new FileReader(new File(remoteAppLogFile
             .toUri().getRawPath())));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84b7f2e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
index 594f186..1e71b3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java
@@ -295,17 +295,20 @@ public class TestAggregatedLogsBlock {
     List<String> rootLogDirs = Arrays.asList("target/logs/logs");
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
 
-    AggregatedLogFormat.LogWriter writer = new AggregatedLogFormat.LogWriter(
-        configuration, new Path(path), ugi);
-    writer.writeApplicationOwner(ugi.getUserName());
-
-    Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType,
String>();
-    appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
-    writer.writeApplicationACLs(appAcls);
-
-    writer.append(new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
-        new AggregatedLogFormat.LogValue(rootLogDirs, containerId,UserGroupInformation.getCurrentUser().getShortUserName()));
-    writer.close();
+    try (AggregatedLogFormat.LogWriter writer =
+             new AggregatedLogFormat.LogWriter()) {
+      writer.initialize(configuration, new Path(path), ugi);
+      writer.writeApplicationOwner(ugi.getUserName());
+
+      Map<ApplicationAccessType, String> appAcls = new HashMap<>();
+      appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
+      writer.writeApplicationACLs(appAcls);
+
+      writer.append(
+          new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
+          new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+              UserGroupInformation.getCurrentUser().getShortUserName()));
+    }
   }
 
   private void writeLogs(String dirName) throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84b7f2e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
index c6841c9..8b665e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java
@@ -110,13 +110,14 @@ public final class TestContainerLogsUtils {
       ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
     Path path =
         new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
-    AggregatedLogFormat.LogWriter writer =
-        new AggregatedLogFormat.LogWriter(configuration, path, ugi);
-    writer.writeApplicationOwner(ugi.getUserName());
+    try (AggregatedLogFormat.LogWriter writer =
+        new AggregatedLogFormat.LogWriter()) {
+      writer.initialize(configuration, path, ugi);
+      writer.writeApplicationOwner(ugi.getUserName());
 
-    writer.append(new AggregatedLogFormat.LogKey(containerId),
-        new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
-        ugi.getShortUserName()));
-    writer.close();
+      writer.append(new AggregatedLogFormat.LogKey(containerId),
+          new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+              ugi.getShortUserName()));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84b7f2e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index d70acc9..f465534 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -295,18 +295,18 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       }
     }
 
-    LogWriter writer = null;
+    if (pendingContainerInThisCycle.isEmpty()) {
+      sendLogAggregationReport(true, "", appFinished);
+      return;
+    }
+
+    logAggregationTimes++;
     String diagnosticMessage = "";
     boolean logAggregationSucceedInThisCycle = true;
-    try {
-      if (pendingContainerInThisCycle.isEmpty()) {
-        return;
-      }
-
-      logAggregationTimes++;
-
+    try (LogWriter writer = createLogWriter()) {
       try {
-        writer = createLogWriter();
+        writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
+            this.userUgi);
         // Write ACLs once when the writer is created.
         writer.writeApplicationACLs(appAcls);
         writer.writeApplicationOwner(this.userUgi.getShortUserName());
@@ -351,11 +351,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         cleanupOldLogTimes++;
       }
 
-      if (writer != null) {
-        writer.close();
-        writer = null;
-      }
-
       long currentTime = System.currentTimeMillis();
       final Path renamedPath = this.rollingMonitorInterval <= 0
               ? remoteNodeLogFileForApp : new Path(
@@ -396,34 +391,37 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
         logAggregationSucceedInThisCycle = false;
       }
     } finally {
-      LogAggregationStatus logAggregationStatus =
-          logAggregationSucceedInThisCycle
-              ? LogAggregationStatus.RUNNING
-              : LogAggregationStatus.RUNNING_WITH_FAILURE;
-      sendLogAggregationReport(logAggregationStatus, diagnosticMessage);
-      if (appFinished) {
-        // If the app is finished, one extra final report with log aggregation
-        // status SUCCEEDED/FAILED will be sent to RM to inform the RM
-        // that the log aggregation in this NM is completed.
-        LogAggregationStatus finalLogAggregationStatus =
-            renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
-                ? LogAggregationStatus.FAILED
-                : LogAggregationStatus.SUCCEEDED;
-        sendLogAggregationReport(finalLogAggregationStatus, "");
-      }
-
-      if (writer != null) {
-        writer.close();
-      }
+      sendLogAggregationReport(logAggregationSucceedInThisCycle,
+          diagnosticMessage, appFinished);
     }
   }
 
-  protected LogWriter createLogWriter() throws IOException {
-    return new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
-        this.userUgi);
+  @VisibleForTesting
+  protected LogWriter createLogWriter() {
+    return new LogWriter();
   }
 
   private void sendLogAggregationReport(
+      boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
+      boolean appFinished) {
+    LogAggregationStatus logAggregationStatus =
+        logAggregationSucceedInThisCycle
+            ? LogAggregationStatus.RUNNING
+            : LogAggregationStatus.RUNNING_WITH_FAILURE;
+    sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
+    if (appFinished) {
+      // If the app is finished, one extra final report with log aggregation
+      // status SUCCEEDED/FAILED will be sent to RM to inform the RM
+      // that the log aggregation in this NM is completed.
+      LogAggregationStatus finalLogAggregationStatus =
+          renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
+              ? LogAggregationStatus.FAILED
+              : LogAggregationStatus.SUCCEEDED;
+      sendLogAggregationReportInternal(finalLogAggregationStatus, "");
+    }
+  }
+
+  private void sendLogAggregationReportInternal(
       LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
     LogAggregationReport report =
         Records.newRecord(LogAggregationReport.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/84b7f2e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index 93d6677..75f785a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -416,8 +416,7 @@ public class TestAppLogAggregatorImpl {
           logAggregationContext, context, lfs, -1, recoveredLogInitedTime);
       this.applicationId = appId;
       this.deletionService = deletionService;
-
-      this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp);
+      this.logWriter = spy(new LogWriter());
       this.logValue = ArgumentCaptor.forClass(LogValue.class);
     }
 
@@ -425,10 +424,5 @@ public class TestAppLogAggregatorImpl {
     protected LogWriter createLogWriter() {
       return this.logWriter;
     }
-
-    private LogWriter getSpiedLogWriter(Configuration conf,
-        UserGroupInformation ugi, Path remoteAppLogFile) throws IOException {
-      return spy(new LogWriter(conf, remoteAppLogFile, ugi));
-    }
   }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message