hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r818830 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
Date Fri, 25 Sep 2009 12:15:38 GMT
Author: sharad
Date: Fri Sep 25 12:15:38 2009
New Revision: 818830

URL: http://svn.apache.org/viewvc?rev=818830&view=rev
Log:
MAPREDUCE-1000. Handle corrupt history files in JobHistory.initDone(). Contributed by Jothi
Padmanabhan.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=818830&r1=818829&r2=818830&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 25 12:15:38 2009
@@ -724,3 +724,5 @@
     MAPREDUCE-1022. Fix compilation of vertica testcases. (Vinod Kumar 
     Vavilapalli via acmurthy)
 
+    MAPREDUCE-1000. Handle corrupt history files in JobHistory.initDone().
+    (Jothi Padmanabhan via sharad)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=818830&r1=818829&r2=818830&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
Fri Sep 25 12:15:38 2009
@@ -29,16 +29,15 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobTracker;
@@ -312,7 +311,7 @@
 
   private void moveOldFiles() throws IOException {
     //move the log files remaining from last run to the DONE folder
-    //suffix the file name based on Jobtracker identifier so that history
+    //suffix the file name based on Job tracker identifier so that history
     //files with same job id don't get over written in case of recovery.
     FileStatus[] files = logDirFs.listStatus(logDir);
     String jtIdentifier = jobTracker.getTrackerIdentifier();
@@ -324,7 +323,25 @@
       }
       LOG.info("Moving log file from last run: " + fromPath);
       Path toPath = new Path(done, fromPath.getName() + fileSuffix);
-      moveToDoneNow(fromPath, toPath);
+      try {
+        moveToDoneNow(fromPath, toPath);
+      } catch (ChecksumException e) {
+        // If there is an exception moving the file to done because of
+        // a checksum exception, just delete it
+        LOG.warn("Unable to move " + fromPath +", deleting it");
+        try {
+          boolean b = logDirFs.delete(fromPath, false);
+          LOG.debug("Deletion of corrupt file " + fromPath + " returned " + b);
+        } catch (IOException ioe) {
+          // Cannot delete either? Just log and carry on
+          LOG.warn("Unable to delete " + fromPath + "Exception: " +
+              ioe.getMessage());
+        }
+      } catch (IOException e) {
+        // Exceptions other than checksum, just log and continue
+        LOG.warn("Error moving file " + fromPath + " to done folder." +
+            "Ignoring.");
+      }
     }
   }
   

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java?rev=818830&r1=818829&r2=818830&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistory.java Fri
Sep 25 12:15:38 2009
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -33,8 +34,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapreduce.Cluster;
@@ -46,6 +50,7 @@
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -615,6 +620,16 @@
       mr = new MiniMRCluster(2, dfsCluster.getFileSystem().getUri().toString(),
           3, null, null, conf);
 
+      assertEquals("Files in logDir did not move to DONE folder",
+          0, logDirFs.listStatus(logDirPath).length);
+
+      JobHistory jobHistory = 
+        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
+      Path doneDir = jobHistory.getCompletedJobHistoryLocation();
+
+      assertEquals("Files in DONE dir not correct",
+          2, doneDir.getFileSystem(conf).listStatus(doneDir).length);
+
       // run the TCs
       conf = mr.createJobConf();
 
@@ -635,9 +650,6 @@
       // Run a job that will be succeeded and validate its history file
       RunningJob job = UtilsForTests.runJobSucceed(conf, inDir, outDir);
       
-      JobHistory jobHistory = 
-        mr.getJobTrackerRunner().getJobTracker().getJobHistory();
-      Path doneDir = jobHistory.getCompletedJobHistoryLocation();
       assertEquals("History DONE folder not correct", 
           doneFolder, doneDir.getName());
       JobID id = job.getID();
@@ -900,4 +912,65 @@
     }
   }
 
+  public void testHistoryInitWithCorruptFiles() throws IOException {
+    MiniMRCluster mr = null;
+    try {
+      JobConf conf = new JobConf();
+      Path historyDir = new Path(System.getProperty("test.build.data", "."),
+      "history");
+      conf.set(JTConfig.JT_JOBHISTORY_LOCATION,
+          historyDir.toString());
+      conf.setUser("user");
+
+      FileSystem localFs = FileSystem.getLocal(conf);
+      
+      //there may be some stale files, clean them
+      if (localFs.exists(historyDir)) {
+        boolean deleted = localFs.delete(historyDir, true);
+        LOG.info(historyDir + " deleted " + deleted);
+      }
+
+      // Start the cluster, create a history file
+      mr = new MiniMRCluster(0, "file:///", 3, null, null, conf);
+      JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+      JobHistory jh = jt.getJobHistory();
+      final JobID jobId = JobID.forName("job_200809171136_0001");
+      jh.setupEventWriter(jobId, conf);
+      JobSubmittedEvent jse =
+        new JobSubmittedEvent(jobId, "job", "user", 12345, "path");
+      jh.logEvent(jse, jobId);
+      jh.closeWriter(jobId);
+
+      // Corrupt the history file. User RawLocalFileSystem so that we
+      // do keep the original CRC file intact.
+      String historyFileName = jobId.toString() + "_" + "user";
+      Path historyFilePath = new Path (historyDir.toString(), historyFileName);
+
+      RawLocalFileSystem fs = (RawLocalFileSystem)
+        FileSystem.getLocal(conf).getRaw();
+
+      FSDataOutputStream out = fs.create(historyFilePath, true);
+      byte[] corruptData = new byte[32];
+      new Random().nextBytes(corruptData);
+      out.write (corruptData, 0, 32);
+      out.close();
+
+      // Stop and start the tracker. The tracker should come up nicely
+      mr.stopJobTracker();
+      mr.startJobTracker();
+      jt = mr.getJobTrackerRunner().getJobTracker();
+      assertNotNull("JobTracker did not come up", jt );
+      jh = jt.getJobHistory();
+      assertNotNull("JobHistory did not get initialized correctly", jh);
+
+      // Only the done folder should remain in the history directory
+      assertEquals("Files in logDir did not move to DONE folder",
+          1, historyDir.getFileSystem(conf).listStatus(historyDir).length);
+    } finally {
+      if (mr != null) {
+        cleanupLocalFiles(mr);
+        mr.shutdown();
+      }
+    }
+  }
 }



Mime
View raw message