hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077309 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
Date Fri, 04 Mar 2011 04:02:07 GMT
Author: omalley
Date: Fri Mar  4 04:02:07 2011
New Revision: 1077309

URL: http://svn.apache.org/viewvc?rev=1077309&view=rev
Log:
commit 34f684ec5894f1fab5042aaf23e979a497ce79ff
Author: Vinod Kumar <vinodkv@yahoo-inc.com>
Date:   Wed Mar 10 15:45:12 2010 +0530

    MAPREDUCE-1100 index files related fix from https://issues.apache.org/jira/secure/attachment/12438394/patch-1100-fix-ydist.2.txt

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java?rev=1077309&r1=1077308&r2=1077309&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskLogsMonitor.java
Fri Mar  4 04:02:07 2011
@@ -106,8 +106,16 @@ class TaskLogsMonitor extends Thread {
       return;
     }
 
+    // set this boolean to true if any of the log files is truncated
+    boolean truncated = false;
+
     Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails =
         new HashMap<Task, Map<LogName, LogFileDetail>>();
+    // Make a copy of original indices into updated indices 
+    for (LogName logName : LogName.values()) {
+      copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
+          updatedTaskLogFileDetails, logName);
+    }
 
     File attemptLogDir = TaskLog.getBaseDir(firstAttempt.toString());
 
@@ -142,8 +150,10 @@ class TaskLogsMonitor extends Thread {
       try {
         logFileReader = new FileReader(logFile);
       } catch (FileNotFoundException fe) {
-        LOG.warn("Cannot open " + logFile.getAbsolutePath()
-            + " for reading. Continuing with other log files");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Cannot open " + logFile.getAbsolutePath()
+              + " for reading. Continuing with other log files");
+        }
         if (!tmpFile.delete()) {
           LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
         }
@@ -171,7 +181,7 @@ class TaskLogsMonitor extends Thread {
               + ". Caught exception while handling " + task.getTaskID(),
               ioe);
           // revert back updatedTaskLogFileDetails
-          revertIndexFileInfo(lInfo, taskLogFileDetails,
+          copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
               updatedTaskLogFileDetails, logName);
           if (!tmpFile.delete()) {
             LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
@@ -191,6 +201,7 @@ class TaskLogsMonitor extends Thread {
           newLogFileDetail.start = newCurrentOffset;
           updatedTaskLogFileDetails.get(task).put(logName, newLogFileDetail);
           newCurrentOffset += newLogFileDetail.length;
+          truncated = true; // set the flag truncated
         }
       }
 
@@ -199,7 +210,7 @@ class TaskLogsMonitor extends Thread {
       } catch (IOException ioe) {
         LOG.warn("Couldn't close the tmp file " + tmpFile.getAbsolutePath()
             + ". Deleting it.", ioe);
-        revertIndexFileInfo(lInfo, taskLogFileDetails,
+        copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
             updatedTaskLogFileDetails, logName);
         if (!tmpFile.delete()) {
           LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
@@ -211,7 +222,7 @@ class TaskLogsMonitor extends Thread {
         // If the tmpFile cannot be renamed revert back
         // updatedTaskLogFileDetails to maintain the consistency of the
         // original log file
-        revertIndexFileInfo(lInfo, taskLogFileDetails,
+        copyOriginalIndexFileInfo(lInfo, taskLogFileDetails,
             updatedTaskLogFileDetails, logName);
         if (!tmpFile.delete()) {
           LOG.warn("Cannot delete tmpFile " + tmpFile.getAbsolutePath());
@@ -219,8 +230,10 @@ class TaskLogsMonitor extends Thread {
       }
     }
 
-    // Update the index files
-    updateIndicesAfterLogTruncation(firstAttempt, updatedTaskLogFileDetails);
+    if (truncated) {
+      // Update the index files
+      updateIndicesAfterLogTruncation(firstAttempt, updatedTaskLogFileDetails);
+    }
   }
 
   /**
@@ -229,7 +242,7 @@ class TaskLogsMonitor extends Thread {
    * @param updatedTaskLogFileDetails
    * @param logName
    */
-  private void revertIndexFileInfo(PerJVMInfo lInfo,
+  private void copyOriginalIndexFileInfo(PerJVMInfo lInfo,
       Map<Task, Map<LogName, LogFileDetail>> taskLogFileDetails,
       Map<Task, Map<LogName, LogFileDetail>> updatedTaskLogFileDetails,
       LogName logName) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java?rev=1077309&r1=1077308&r2=1077309&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskLogsMonitor.java
Fri Mar  4 04:02:07 2011
@@ -145,11 +145,17 @@ public class TestTaskLogsMonitor {
                             0);
 
     // Let the tasks write logs within retain-size
-    writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 500, 'H');
+    for (LogName log : LogName.values()) {
+      writeRealBytes(attemptID, attemptID, log, 500, 'H');
+    }
+    File logIndex = TaskLog.getIndexFile(attemptID.toString(), false);
+    long indexModificationTimeStamp = logIndex.lastModified();
 
     logsMonitor.monitorTaskLogs();
     File attemptDir = TaskLog.getBaseDir(attemptID.toString());
     assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+    assertEquals("index file got modified", indexModificationTimeStamp,
+        logIndex.lastModified());
 
     // Finish the task and the JVM too.
     logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
@@ -157,14 +163,29 @@ public class TestTaskLogsMonitor {
     // There should be no truncation of the log-file.
     logsMonitor.monitorTaskLogs();
     assertTrue(attemptDir.exists());
-    File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
-    assertEquals(500, logFile.length());
-    // The index file should also be proper.
-    assertEquals(500, getAllLogsFileLengths(attemptID, false).get(
-        LogName.SYSLOG).longValue());
+    assertEquals("index file got modified", indexModificationTimeStamp,
+        logIndex.lastModified());
+
+    Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(500, logFile.length());
+      // The index file should also be proper.
+      assertEquals(500, logLengths.get(log).longValue());
+    }
 
+    // truncate it once again
     logsMonitor.monitorTaskLogs();
-    assertEquals(500, logFile.length());
+    assertEquals("index file got modified", indexModificationTimeStamp,
+        logIndex.lastModified());
+    
+    logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(500, logFile.length());
+      // The index file should also be proper.
+      assertEquals(500, logLengths.get(log).longValue());
+    }
   }
 
   /**
@@ -187,7 +208,9 @@ public class TestTaskLogsMonitor {
                             0);
 
     // Let the tasks write some logs
-    writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
+    for (LogName log : LogName.values()) {
+      writeRealBytes(attemptID, attemptID, log, 1500, 'H');
+    }
 
     logsMonitor.monitorTaskLogs();
     File attemptDir = TaskLog.getBaseDir(attemptID.toString());
@@ -199,11 +222,13 @@ public class TestTaskLogsMonitor {
     // The log-file should not be truncated.
     logsMonitor.monitorTaskLogs();
     assertTrue(attemptDir.exists());
-    File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
-    assertEquals(1500, logFile.length());
-    // The index file should also be proper.
-    assertEquals(1500, getAllLogsFileLengths(attemptID, false).get(
-        LogName.SYSLOG).longValue());
+    Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(1500, logFile.length());
+      // The index file should also be proper.
+      assertEquals(1500, logLengths.get(log).longValue());
+    }
   }
 
   /**
@@ -225,7 +250,63 @@ public class TestTaskLogsMonitor {
                             0);
 
     // Let the tasks write logs more than retain-size
+    for (LogName log : LogName.values()) {
+      writeRealBytes(attemptID, attemptID, log, 1500, 'H');
+    }
+
+    logsMonitor.monitorTaskLogs();
+    File attemptDir = TaskLog.getBaseDir(attemptID.toString());
+    assertTrue(attemptDir + " doesn't exist!", attemptDir.exists());
+
+    // Finish the task and the JVM too.
+    logsMonitor.addProcessForLogTruncation(attemptID, Arrays.asList(task));
+
+    // The log-file should now be truncated.
+    logsMonitor.monitorTaskLogs();
+    assertTrue(attemptDir.exists());
+
+    Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(1000, logFile.length());
+      // The index file should also be proper.
+      assertEquals(1000, logLengths.get(log).longValue());
+    }
+
+    // truncate once again
+    logsMonitor.monitorTaskLogs();
+    logLengths = getAllLogsFileLengths(attemptID, false);
+    for (LogName log : LogName.values()) {
+      File logFile = TaskLog.getTaskLogFile(attemptID, log);
+      assertEquals(1000, logFile.length());
+      // The index file should also be proper.
+      assertEquals(1000, logLengths.get(log).longValue());
+    }
+  }
+
+  /**
+   * Test the truncation of log-file.
+   * 
+   * It writes two log files and truncates one, does not truncate other. 
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testLogTruncation() throws IOException {
+    TaskTracker taskTracker = new TaskTracker();
+    TaskLogsMonitor logsMonitor = new TaskLogsMonitor(1000L, 1000L);
+    taskTracker.setTaskLogsMonitor(logsMonitor);
+
+    TaskID baseId = new TaskID();
+    int taskcount = 0;
+
+    TaskAttemptID attemptID = new TaskAttemptID(baseId, taskcount++);
+    Task task = new MapTask(null, attemptID, 0, new JobSplit.TaskSplitIndex(), 
+                            0);
+
+    // Let the tasks write logs more than retain-size
     writeRealBytes(attemptID, attemptID, LogName.SYSLOG, 1500, 'H');
+    writeRealBytes(attemptID, attemptID, LogName.STDERR, 500, 'H');
 
     logsMonitor.monitorTaskLogs();
     File attemptDir = TaskLog.getBaseDir(attemptID.toString());
@@ -237,14 +318,28 @@ public class TestTaskLogsMonitor {
     // The log-file should now be truncated.
     logsMonitor.monitorTaskLogs();
     assertTrue(attemptDir.exists());
+
+    Map<LogName, Long> logLengths = getAllLogsFileLengths(attemptID, false);
     File logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
     assertEquals(1000, logFile.length());
     // The index file should also be proper.
-    assertEquals(1000, getAllLogsFileLengths(attemptID, false).get(
-        LogName.SYSLOG).longValue());
+    assertEquals(1000, logLengths.get(LogName.SYSLOG).longValue());
+    logFile = TaskLog.getTaskLogFile(attemptID, LogName.STDERR);
+    assertEquals(500, logFile.length());
+    // The index file should also be proper.
+    assertEquals(500, logLengths.get(LogName.STDERR).longValue());
 
+    // truncate once again
     logsMonitor.monitorTaskLogs();
+    logLengths = getAllLogsFileLengths(attemptID, false);
+    logFile = TaskLog.getTaskLogFile(attemptID, LogName.SYSLOG);
     assertEquals(1000, logFile.length());
+    // The index file should also be proper.
+    assertEquals(1000, logLengths.get(LogName.SYSLOG).longValue());
+    logFile = TaskLog.getTaskLogFile(attemptID, LogName.STDERR);
+    assertEquals(500, logFile.length());
+    // The index file should also be proper.
+    assertEquals(500, logLengths.get(LogName.STDERR).longValue());
   }
 
   /**
@@ -348,6 +443,7 @@ public class TestTaskLogsMonitor {
       new File(System.getProperty("test.build.data", "/tmp")).toURI().toString().replace(
           ' ', '+');
 
+  private static String STDERR_LOG = "stderr log";
   public static class LoggingMapper<K, V> extends IdentityMapper<K, V> {
 
     public void map(K key, V val, OutputCollector<K, V> output,
@@ -357,6 +453,8 @@ public class TestTaskLogsMonitor {
         System.out.println("Lots of logs! Lots of logs! "
             + "Waiting to be truncated! Lots of logs!");
       }
+      // write some log into stderr
+      System.err.println(STDERR_LOG);
       super.map(key, val, output, reporter);
     }
   }
@@ -410,6 +508,14 @@ public class TestTaskLogsMonitor {
                 TaskLog.LogName.STDOUT).length();
         assertTrue("STDOUT log file length for " + tce.getTaskAttemptId()
             + " is " + length + " and not <=10000", length <= 10000);
+        if (tce.isMap) {
+          String stderr = TestMiniMRMapRedDebugScript.readTaskLog(
+              LogName.STDERR, tce.getTaskAttemptId(), false);
+          System.out.println("STDERR log:" + stderr);
+          assertTrue(stderr.length() > 0);
+          assertTrue(stderr.length() < 10000);
+          assertTrue(stderr.equals(STDERR_LOG));
+        }
       }
     } finally {
       if (mr != null) {



Mime
View raw message