hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r816454 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
Date Fri, 18 Sep 2009 03:23:08 GMT
Author: cdouglas
Date: Fri Sep 18 03:23:07 2009
New Revision: 816454

URL: http://svn.apache.org/viewvc?rev=816454&view=rev
Log:
MAPREDUCE-995. Fix a bug in JobHistory where tasks completing after the job
is closed cause a NPE. Contributed by Jothi Padmanabhan

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816454&r1=816453&r2=816454&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 18 03:23:07 2009
@@ -629,3 +629,6 @@
 
     MAPREDUCE-971. distcp does not always remove distcp.tmp.dir. (Aaron Kimball
     via tomwhite)
+
+    MAPREDUCE-995. Fix a bug in JobHistory where tasks completing after the job
+    is closed cause a NPE. (Jothi Padmanabhan via cdouglas)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=816454&r1=816453&r2=816454&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java
Fri Sep 18 03:23:07 2009
@@ -21,7 +21,7 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Date;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -55,7 +55,8 @@
   final Log LOG = LogFactory.getLog(JobHistory.class);
 
   private long jobHistoryBlockSize;
-  private Map<JobID, MetaInfo> fileMap;
+  private final Map<JobID, MetaInfo> fileMap =
+    Collections.<JobID,MetaInfo>synchronizedMap(new HashMap<JobID,MetaInfo>());
   private ThreadPoolExecutor executor = null;
   static final FsPermission HISTORY_DIR_PERMISSION =
     FsPermission.createImmutable((short) 0750); // rwxr-x---
@@ -115,8 +116,6 @@
           3 * 1024 * 1024);
     
     jobTracker = jt;
-    
-    fileMap = new HashMap<JobID, MetaInfo> ();
   }
   
   /** Initialize the done directory and start the history cleaner thread */
@@ -305,29 +304,15 @@
   /** Close the event writer for this id */
   public void closeWriter(JobID id) {
     try {
-      EventWriter writer = getWriter(id);
-      writer.close();
+      final MetaInfo mi = fileMap.get(id);
+      if (mi != null) {
+        mi.closeWriter();
+      }
     } catch (IOException e) {
       LOG.info("Error closing writer for JobID: " + id);
     }
   }
 
-
-  /**
-   * Get the JsonEventWriter for the specified Job Id
-   * @param jobId
-   * @return
-   * @throws IOException if a writer is not available
-   */
-  private EventWriter getWriter(final JobID jobId) throws IOException {
-    EventWriter writer = null;
-    MetaInfo mi = fileMap.get(jobId);
-    if (mi == null || (writer = mi.getEventWriter()) == null) {
-      throw new IOException("History File does not exist for JobID");
-    }
-    return writer;
-  }
-
   /**
    * Method to log the specified event
    * @param event The event to log
@@ -335,10 +320,12 @@
    */
   public void logEvent(HistoryEvent event, JobID id) {
     try {
-      EventWriter writer = getWriter(id);
-      writer.write(event);
+      final MetaInfo mi = fileMap.get(id);
+      if (mi != null) {
+        mi.writeEvent(event);
+      }
     } catch (IOException e) {
-      LOG.error("Error creating writer, " + e.getMessage());
+      LOG.error("Error Logging event, " + e.getMessage());
     }
   }
 
@@ -388,7 +375,7 @@
   
   private void moveToDone(final JobID id) {
     final List<Path> paths = new ArrayList<Path>();
-    MetaInfo metaInfo = fileMap.get(id);
+    final MetaInfo metaInfo = fileMap.get(id);
     if (metaInfo == null) {
       LOG.info("No file for job-history with " + id + " found in cache!");
       return;
@@ -456,7 +443,19 @@
 
     Path getHistoryFile() { return historyFile; }
     Path getConfFile() { return confFile; }
-    EventWriter getEventWriter() { return writer; }
+
+    synchronized void closeWriter() throws IOException {
+      if (writer != null) {
+        writer.close();
+      }
+      writer = null;
+    }
+
+    synchronized void writeEvent(HistoryEvent event) throws IOException {
+      if (writer != null) {
+        writer.write(event);
+      }
+    }
   }
 
   /**

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java?rev=816454&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobHistoryParsing.java
Fri Sep 18 03:23:07 2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
+
+/**
+ * Unit test to test if the JobHistory writer/parser is able to handle
+ * values with special characters
+ * This test also tests if the job history module is able to gracefully
+ * ignore events after the event writer is closed
+ *
+ */
+public class TestJobHistoryParsing  extends TestCase {
+
+  public void testHistoryParsing() throws IOException {
+    // open a test history file
+    Path historyDir = new Path(System.getProperty("test.build.data", "."),
+                                "history");
+    JobConf conf = new JobConf();
+    conf.set("hadoop.job.history.location", historyDir.toString());
+    FileSystem fs = FileSystem.getLocal(new JobConf());
+
+    // Some weird strings
+    String username = "user";
+    String weirdJob = "Value has \n new line \n and " +
+                    "dot followed by new line .\n in it +" +
+                    "ends with escape\\";
+    String weirdPath = "Value has characters: " +
+                    "`1234567890-=qwertyuiop[]\\asdfghjkl;'zxcvbnm,./" +
+                    "~!@#$%^&*()_+QWERTYUIOP{}|ASDFGHJKL:\"'ZXCVBNM<>?" +
+                    "\t\b\n\f\"\n in it";
+
+    conf.setUser(username);
+
+    MiniMRCluster mr = null;
+    mr = new MiniMRCluster(2, "file:///", 3, null, null, conf);
+
+    JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
+    JobHistory jh = jt.getJobHistory();
+
+    jh.init(jt, conf, "localhost", 1234);
+    JobID jobId = JobID.forName("job_200809171136_0001");
+    jh.setupEventWriter(jobId, conf);
+    JobSubmittedEvent jse =
+      new JobSubmittedEvent(jobId, weirdJob, username, 12345, weirdPath);
+    jh.logEvent(jse, jobId);
+
+    JobFinishedEvent jfe =
+      new JobFinishedEvent(jobId, 12346, 1, 1, 0, 0, new Counters());
+    jh.logEvent(jfe, jobId);
+    jh.closeWriter(jobId);
+
+    // Try to write one more event now, should not fail
+    TaskID tid = TaskID.forName("task_200809171136_0001_m_000002");
+    TaskFinishedEvent tfe =
+      new TaskFinishedEvent(tid, 0, TaskType.MAP, "", null);
+    boolean caughtException = false;
+
+    try {
+      jh.logEvent(tfe, jobId);
+    } catch (Exception e) {
+      caughtException = true;
+    }
+
+    assertFalse("Writing an event after closing event writer is not handled",
+        caughtException);
+
+    String historyFileName = jobId.toString() + "_" + username;
+    Path historyFilePath = new Path (historyDir.toString(),
+      historyFileName);
+
+    System.out.println("History File is " + historyFilePath.toString());
+
+    JobHistoryParser parser =
+      new JobHistoryParser(fs, historyFilePath);
+
+    JobHistoryParser.JobInfo jobInfo = parser.parse();
+
+    assertTrue (jobInfo.getUsername().equals(username));
+    assertTrue(jobInfo.getJobname().equals(weirdJob));
+    assertTrue(jobInfo.getJobConfPath().equals(weirdPath));
+
+    if (mr != null) {
+      mr.shutdown();
+    }
+  }
+}



Mime
View raw message