Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 62FA99123 for ; Wed, 18 Apr 2012 23:46:30 +0000 (UTC) Received: (qmail 52227 invoked by uid 500); 18 Apr 2012 23:46:26 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 52109 invoked by uid 500); 18 Apr 2012 23:46:26 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 52021 invoked by uid 99); 18 Apr 2012 23:46:26 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2012 23:46:26 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Apr 2012 23:36:26 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 05B342388C18; Wed, 18 Apr 2012 23:35:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1327724 [2/4] - in /hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-a... Date: Wed, 18 Apr 2012 23:35:42 -0000 To: mapreduce-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120418233548.05B342388C18@eris.apache.org> Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Wed Apr 18 23:35:30 2012 @@ -183,8 +183,6 @@ public interface MRJobConfig { public static final String MAP_JAVA_OPTS = "mapreduce.map.java.opts"; - public static final String MAP_ULIMIT = "mapreduce.map.ulimit"; - public static final String MAP_MAX_ATTEMPTS = "mapreduce.map.maxattempts"; public static final String MAP_DEBUG_SCRIPT = "mapreduce.map.debug.script"; @@ -243,8 +241,6 @@ public interface MRJobConfig { public static final String REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts"; - public static final String REDUCE_ULIMIT = "mapreduce.reduce.ulimit"; - public static final String MAPREDUCE_JOB_DIR = "mapreduce.job.dir"; public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts"; Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Wed Apr 18 23:35:30 2012 @@ -276,6 +276,17 @@ public class JobHistoryParser { attemptInfo.shuffleFinishTime = event.getFinishTime(); attemptInfo.sortFinishTime = event.getFinishTime(); attemptInfo.mapFinishTime = event.getFinishTime(); + if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status)) + { + //this is a successful task + if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId())) + { + // the failed attempt is the one that made this task successful + // so its no longer successful + taskInfo.status = null; + // not resetting the other fields set in handleTaskFinishedEvent() + } + } } private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { @@ -299,6 +310,7 @@ public class JobHistoryParser { taskInfo.counters = event.getCounters(); taskInfo.finishTime = event.getFinishTime(); taskInfo.status = TaskStatus.State.SUCCEEDED.toString(); + taskInfo.successfulAttemptId = event.getSuccessfulTaskAttemptId(); } private void handleTaskUpdatedEvent(TaskUpdatedEvent event) { @@ -514,6 +526,7 @@ public class JobHistoryParser { String status; String error; TaskAttemptID failedDueToAttemptId; + TaskAttemptID successfulAttemptId; Map attemptsMap; public TaskInfo() { @@ -554,6 +567,10 @@ public class JobHistoryParser { public TaskAttemptID getFailedDueToAttemptId() { return failedDueToAttemptId; } + /** @return the attempt Id that caused this task to succeed */ + public TaskAttemptID getSuccessfulAttemptId() { + return successfulAttemptId; + } /** @return the error */ public String getError() { return error; } /** @return the map of all attempts for this task */ Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java Wed Apr 18 23:35:30 2012 @@ -22,6 +22,7 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; @@ -36,6 +37,7 @@ public class TaskFinishedEvent implement private TaskFinished datum = null; private TaskID taskid; + private TaskAttemptID successfulAttemptId; private long finishTime; private TaskType taskType; private String status; @@ -44,15 +46,17 @@ public class TaskFinishedEvent implement /** * Create an event to record the successful completion of a task * @param id Task ID + * @param attemptId Task Attempt ID of the successful attempt for this task * @param finishTime Finish time of the task * @param taskType Type of the task * @param status Status string * @param counters Counters for the task */ - public TaskFinishedEvent(TaskID id, long finishTime, + public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime, TaskType taskType, String status, Counters counters) { this.taskid = id; + this.successfulAttemptId = attemptId; this.finishTime = finishTime; this.taskType = taskType; this.status = status; @@ -65,6 +69,10 @@ public class TaskFinishedEvent implement if (datum == null) { datum = new TaskFinished(); datum.taskid = new Utf8(taskid.toString()); + if(successfulAttemptId != null) + { + datum.successfulAttemptId = new Utf8(successfulAttemptId.toString()); + } datum.finishTime = finishTime; datum.counters = EventWriter.toAvro(counters); datum.taskType = new Utf8(taskType.name()); @@ -76,6 +84,10 @@ public class TaskFinishedEvent implement public void setDatum(Object oDatum) { this.datum = (TaskFinished)oDatum; this.taskid = TaskID.forName(datum.taskid.toString()); + if (datum.successfulAttemptId != null) { + this.successfulAttemptId = TaskAttemptID + .forName(datum.successfulAttemptId.toString()); + } this.finishTime = datum.finishTime; this.taskType = TaskType.valueOf(datum.taskType.toString()); this.status = datum.status.toString(); @@ -84,6 +96,14 @@ public class TaskFinishedEvent implement /** Get task id */ public TaskID getTaskId() { return TaskID.forName(taskid.toString()); } + /** Get successful task attempt id */ + public TaskAttemptID getSuccessfulTaskAttemptId() { + if(successfulAttemptId != null) + { + return TaskAttemptID.forName(successfulAttemptId.toString()); + } + return null; + } /** Get the task finish time */ public long getFinishTime() { return finishTime; } /** Get task counters */ Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Wed Apr 18 23:35:30 2012 @@ -314,8 +314,6 @@ public class ConfigUtil { new String[] {MRJobConfig.MAP_ENV}); Configuration.addDeprecation("mapred.map.child.java.opts", new String[] {MRJobConfig.MAP_JAVA_OPTS}); - Configuration.addDeprecation("mapred.map.child.ulimit", - new String[] {MRJobConfig.MAP_ULIMIT}); Configuration.addDeprecation("mapred.map.max.attempts", new String[] {MRJobConfig.MAP_MAX_ATTEMPTS}); Configuration.addDeprecation("mapred.map.task.debug.script", @@ -362,8 +360,6 @@ public class ConfigUtil { new String[] {MRJobConfig.REDUCE_ENV}); Configuration.addDeprecation("mapred.reduce.child.java.opts", new String[] {MRJobConfig.REDUCE_JAVA_OPTS}); - Configuration.addDeprecation("mapred.reduce.child.ulimit", - new String[] {MRJobConfig.REDUCE_ULIMIT}); Configuration.addDeprecation("mapred.reduce.max.attempts", new String[] {MRJobConfig.REDUCE_MAX_ATTEMPTS}); Configuration.addDeprecation("mapred.reduce.parallel.copies", Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Wed Apr 18 23:35:30 2012 @@ -411,9 +411,6 @@ /tmp and to set the heap maximum to be a gigabyte, pass a 'value' of: -Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc - The configuration variable mapred.child.ulimit can be used to control the - maximum virtual memory of the child processes. - Usage of -Djava.library.path can cause programs to no longer function if hadoop native libraries are used. These values should instead be set as part of LD_LIBRARY_PATH in the map / reduce JVM env using the mapreduce.map.env and @@ -432,20 +429,6 @@ - mapred.child.ulimit - - The maximum virtual memory, in KB, of a process launched by the - Map-Reduce framework. This can be used to control both the Mapper/Reducer - tasks and applications using Hadoop Pipes, Hadoop Streaming etc. - By default it is left unspecified to let cluster admins control it via - limits.conf and other such relevant mechanisms. - - Note: mapred.child.ulimit must be greater than or equal to the -Xmx passed to - JavaVM, else the VM might not start. - - - - mapreduce.admin.user.env LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native Expert: Additional execution environment entries for Propchange: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1324567-1327718 Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java Wed Apr 18 23:35:30 2012 @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; -import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.yarn.YarnException; @@ -82,32 +82,41 @@ public class CachedHistoryStorage extend super(CachedHistoryStorage.class.getName()); } - private Job loadJob(MetaInfo metaInfo) { + private Job loadJob(HistoryFileInfo fileInfo) { try { - Job job = hsManager.loadJob(metaInfo); + Job job = fileInfo.loadJob(); if (LOG.isDebugEnabled()) { LOG.debug("Adding " + job.getID() + " to loaded job cache"); } + // We can clobber results here, but that should be OK, because it only + // means that we may have two identical copies of the same job floating + // around for a while. loadedJobCache.put(job.getID(), job); return job; } catch (IOException e) { throw new YarnException( - "Could not find/load job: " + metaInfo.getJobId(), e); + "Could not find/load job: " + fileInfo.getJobId(), e); } } @Override - public synchronized Job getFullJob(JobId jobId) { + public Job getFullJob(JobId jobId) { if (LOG.isDebugEnabled()) { LOG.debug("Looking for Job " + jobId); } try { - Job result = loadedJobCache.get(jobId); - if (result == null) { - MetaInfo metaInfo = hsManager.getMetaInfo(jobId); - if (metaInfo != null) { - result = loadJob(metaInfo); + HistoryFileInfo fileInfo = hsManager.getFileInfo(jobId); + Job result = null; + if (fileInfo != null) { + result = loadedJobCache.get(jobId); + if (result == null) { + result = loadJob(fileInfo); + } else if(fileInfo.isDeleted()) { + loadedJobCache.remove(jobId); + result = null; } + } else { + loadedJobCache.remove(jobId); } return result; } catch (IOException e) { @@ -120,25 +129,20 @@ public class CachedHistoryStorage extend LOG.debug("Called getAllPartialJobs()"); SortedMap result = new TreeMap(); try { - for (MetaInfo mi : hsManager.getAllMetaInfo()) { + for (HistoryFileInfo mi : hsManager.getAllFileInfo()) { if (mi != null) { JobId id = mi.getJobId(); result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); } } } catch (IOException e) { - LOG.warn("Error trying to scan for all MetaInfos", e); + LOG.warn("Error trying to scan for all FileInfos", e); throw new YarnException(e); } return result; } @Override - public void jobRemovedFromHDFS(JobId jobId) { - loadedJobCache.remove(jobId); - } - - @Override public JobsInfo getPartialJobs(Long offset, Long count, String user, String queue, Long sBegin, Long sEnd, Long fBegin, Long fEnd, JobState jobState) { @@ -173,6 +177,7 @@ public class CachedHistoryStorage extend if (end < 0) { // due to overflow end = Long.MAX_VALUE; } + for (Job job : jobs) { if (at > end) { break; Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Wed Apr 18 23:35:30 2012 @@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.UserGroupInformation; @@ -71,7 +72,7 @@ public class CompletedJob implements org private final Configuration conf; private final JobId jobId; //Can be picked from JobInfo with a conversion. private final String user; //Can be picked up from JobInfo - private final Path confFile; + private final HistoryFileInfo info; private JobInfo jobInfo; private JobReport report; AtomicBoolean tasksLoaded = new AtomicBoolean(false); @@ -84,13 +85,14 @@ public class CompletedJob implements org public CompletedJob(Configuration conf, JobId jobId, Path historyFile, - boolean loadTasks, String userName, Path confFile, JobACLsManager aclsMgr) + boolean loadTasks, String userName, HistoryFileInfo info, + JobACLsManager aclsMgr) throws IOException { LOG.info("Loading job: " + jobId + " from file: " + historyFile); this.conf = conf; this.jobId = jobId; this.user = userName; - this.confFile = confFile; + this.info = info; this.aclsMgr = aclsMgr; loadFullHistoryData(loadTasks, historyFile); } @@ -134,7 +136,7 @@ public class CompletedJob implements org report.setUser(jobInfo.getUsername()); report.setMapProgress((float) getCompletedMaps() / getTotalMaps()); report.setReduceProgress((float) getCompletedReduces() / getTotalReduces()); - report.setJobFile(confFile.toString()); + report.setJobFile(getConfFile().toString()); String historyUrl = "N/A"; try { historyUrl = JobHistoryUtils.getHistoryUrl(conf, jobId.getAppId()); @@ -392,7 +394,16 @@ public class CompletedJob implements org */ @Override public Path getConfFile() { - return confFile; + return info.getConfFile(); + } + + /* + * (non-Javadoc) + * @see org.apache.hadoop.mapreduce.v2.app.job.Job#loadConfFile() + */ + @Override + public Configuration loadConfFile() throws IOException { + return info.loadConfFile(); } @Override Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Wed Apr 18 23:35:30 2012 @@ -25,12 +25,17 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,6 +62,8 @@ import org.apache.hadoop.mapreduce.v2.jo import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * This class provides a way to interact with history files in a thread safe * manor. @@ -67,33 +74,251 @@ public class HistoryFileManager extends private static final Log LOG = LogFactory.getLog(HistoryFileManager.class); private static final Log SUMMARY_LOG = LogFactory.getLog(JobSummary.class); + private static enum HistoryInfoState { + IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED + }; + private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils .doneSubdirsBeforeSerialTail(); - public static class MetaInfo { + /** + * Maps between a serial number (generated based on jobId) and the timestamp + * component(s) to which it belongs. Facilitates jobId based searches. If a + * jobId is not found in this list - it will not be found. + */ + private static class SerialNumberIndex { + private SortedMap> cache; + private int maxSize; + + public SerialNumberIndex(int maxSize) { + this.cache = new TreeMap>(); + this.maxSize = maxSize; + } + + public synchronized void add(String serialPart, String timestampPart) { + if (!cache.containsKey(serialPart)) { + cache.put(serialPart, new HashSet()); + if (cache.size() > maxSize) { + String key = cache.firstKey(); + LOG.error("Dropping " + key + + " from the SerialNumberIndex. We will no " + + "longer be able to see jobs that are in that serial index for " + + cache.get(key)); + cache.remove(key); + } + } + Set datePartSet = cache.get(serialPart); + datePartSet.add(timestampPart); + } + + public synchronized void remove(String serialPart, String timeStampPart) { + if (cache.containsKey(serialPart)) { + Set set = cache.get(serialPart); + set.remove(timeStampPart); + if (set.isEmpty()) { + cache.remove(serialPart); + } + } + } + + public synchronized Set get(String serialPart) { + Set found = cache.get(serialPart); + if (found != null) { + return new HashSet(found); + } + return null; + } + } + + private static class JobListCache { + private ConcurrentSkipListMap cache; + private int maxSize; + private long maxAge; + + public JobListCache(int maxSize, long maxAge) { + this.maxSize = maxSize; + this.maxAge = maxAge; + this.cache = new ConcurrentSkipListMap(); + } + + public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) { + JobId jobId = fileInfo.getJobIndexInfo().getJobId(); + if (LOG.isDebugEnabled()) { + LOG.debug("Adding " + jobId + " to job list cache with " + + fileInfo.getJobIndexInfo()); + } + HistoryFileInfo old = cache.putIfAbsent(jobId, fileInfo); + if (cache.size() > maxSize) { + //There is a race here, where more then one thread could be trying to + // remove entries. This could result in too many entries being removed + // from the cache. This is considered OK as the size of the cache + // should be rather large, and we would rather have performance over + // keeping the cache size exactly at the maximum. + Iterator keys = cache.navigableKeySet().iterator(); + long cutoff = System.currentTimeMillis() - maxAge; + while(cache.size() > maxSize && keys.hasNext()) { + JobId key = keys.next(); + HistoryFileInfo firstValue = cache.get(key); + if(firstValue != null) { + synchronized(firstValue) { + if (firstValue.isMovePending()) { + if(firstValue.didMoveFail() && + firstValue.jobIndexInfo.getFinishTime() <= cutoff) { + cache.remove(key); + //Now lets try to delete it + try { + firstValue.delete(); + } catch (IOException e) { + LOG.error("Error while trying to delete history files" + + " that could not be moved to done.", e); + } + } else { + LOG.warn("Waiting to remove " + key + + " from JobListCache because it is not in done yet."); + } + } else { + cache.remove(key); + } + } + } + } + } + return old; + } + + public void delete(HistoryFileInfo fileInfo) { + cache.remove(fileInfo.getJobId()); + } + + public Collection values() { + return new ArrayList(cache.values()); + } + + public HistoryFileInfo get(JobId jobId) { + return cache.get(jobId); + } + } + + public class HistoryFileInfo { private Path historyFile; private Path confFile; private Path summaryFile; private JobIndexInfo jobIndexInfo; + private HistoryInfoState state; - public MetaInfo(Path historyFile, Path confFile, Path summaryFile, - JobIndexInfo jobIndexInfo) { + private HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile, + JobIndexInfo jobIndexInfo, boolean isInDone) { this.historyFile = historyFile; this.confFile = confFile; this.summaryFile = summaryFile; this.jobIndexInfo = jobIndexInfo; + state = isInDone ? HistoryInfoState.IN_DONE + : HistoryInfoState.IN_INTERMEDIATE; } - private Path getHistoryFile() { - return historyFile; + private synchronized boolean isMovePending() { + return state == HistoryInfoState.IN_INTERMEDIATE + || state == HistoryInfoState.MOVE_FAILED; } - private Path getConfFile() { - return confFile; + private synchronized boolean didMoveFail() { + return state == HistoryInfoState.MOVE_FAILED; + } + + /** + * @return true if the files backed by this were deleted. + */ + public synchronized boolean isDeleted() { + return state == HistoryInfoState.DELETED; } - private Path getSummaryFile() { - return summaryFile; + private synchronized void moveToDone() throws IOException { + if (!isMovePending()) { + // It was either deleted or is already in done. Either way do nothing + return; + } + try { + long completeTime = jobIndexInfo.getFinishTime(); + if (completeTime == 0) { + completeTime = System.currentTimeMillis(); + } + JobId jobId = jobIndexInfo.getJobId(); + + List paths = new ArrayList(2); + if (historyFile == null) { + LOG.info("No file for job-history with " + jobId + " found in cache!"); + } else { + paths.add(historyFile); + } + + if (confFile == null) { + LOG.info("No file for jobConf with " + jobId + " found in cache!"); + } else { + paths.add(confFile); + } + + if (summaryFile == null) { + LOG.info("No summary file for job: " + jobId); + } else { + String jobSummaryString = getJobSummary(intermediateDoneDirFc, + summaryFile); + SUMMARY_LOG.info(jobSummaryString); + LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); + intermediateDoneDirFc.delete(summaryFile, false); + summaryFile = null; + } + + Path targetDir = canonicalHistoryLogPath(jobId, completeTime); + addDirectoryToSerialNumberIndex(targetDir); + makeDoneSubdir(targetDir); + if (historyFile != null) { + Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile + .getName())); + if (!toPath.equals(historyFile)) { + moveToDoneNow(historyFile, toPath); + historyFile = toPath; + } + } + if (confFile != null) { + Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile + .getName())); + if (!toPath.equals(confFile)) { + moveToDoneNow(confFile, toPath); + confFile = toPath; + } + } + state = HistoryInfoState.IN_DONE; + } catch (Throwable t) { + LOG.error("Error while trying to move a job to done", t); + this.state = HistoryInfoState.MOVE_FAILED; + } + } + + /** + * Parse a job from the JobHistoryFile, if the underlying file is not going + * to be deleted. + * + * @return the Job or null if the underlying file was deleted. + * @throws IOException + * if there is an error trying to read the file. + */ + public synchronized Job loadJob() throws IOException { + return new CompletedJob(conf, jobIndexInfo.getJobId(), historyFile, + false, jobIndexInfo.getUser(), this, aclsMgr); + } + + /** + * Return the history file. This should only be used for testing. + * @return the history file. + */ + synchronized Path getHistoryFile() { + return historyFile; + } + + private synchronized void delete() throws IOException { + state = HistoryInfoState.DELETED; + doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); + doneDirFc.delete(doneDirFc.makeQualified(confFile), false); } public JobIndexInfo getJobIndexInfo() { @@ -104,57 +329,35 @@ public class HistoryFileManager extends return jobIndexInfo.getJobId(); } - private void setHistoryFile(Path historyFile) { - this.historyFile = historyFile; - } - - private void setConfFile(Path confFile) { - this.confFile = confFile; + public synchronized Path getConfFile() { + return confFile; } - - private void setSummaryFile(Path summaryFile) { - this.summaryFile = summaryFile; + + public synchronized Configuration loadConfFile() throws IOException { + FileContext fc = FileContext.getFileContext(confFile.toUri(), conf); + Configuration jobConf = new Configuration(false); + jobConf.addResource(fc.open(confFile)); + return jobConf; } } - /** - * Maps between a serial number (generated based on jobId) and the timestamp - * component(s) to which it belongs. Facilitates jobId based searches. If a - * jobId is not found in this list - it will not be found. - */ - private final SortedMap> idToDateString = - new TreeMap>(); - // The number of entries in idToDateString - private int dateStringCacheSize; - - // Maintains minimal details for recent jobs (parsed from history file name). - // Sorted on Job Completion Time. - private final SortedMap jobListCache = - new ConcurrentSkipListMap(); - // The number of jobs to maintain in the job list cache. - private int jobListCacheSize; - - // Re-use existing MetaInfo objects if they exist for the specific JobId. - // (synchronization on MetaInfo) - // Check for existence of the object when using iterators. - private final SortedMap intermediateListCache = - new ConcurrentSkipListMap(); + private SerialNumberIndex serialNumberIndex = null; + private JobListCache jobListCache = null; // Maintains a list of known done subdirectories. - private final Set existingDoneSubdirs = new HashSet(); + private final Set existingDoneSubdirs = Collections + .synchronizedSet(new HashSet()); /** * Maintains a mapping between intermediate user directories and the last * known modification time. */ - private Map userDirModificationTimeMap = - new HashMap(); + private Map userDirModificationTimeMap = new HashMap(); private JobACLsManager aclsMgr; private Configuration conf; - // TODO Remove me!!!! private boolean debugMode; private String serialNumberFormat; @@ -165,6 +368,9 @@ public class HistoryFileManager extends private FileContext intermediateDoneDirFc; // Intermediate Done Dir // FileContext + private ThreadPoolExecutor moveToDoneExecutor = null; + private long maxHistoryAge = 0; + public HistoryFileManager() { super(HistoryFileManager.class.getName()); } @@ -211,12 +417,25 @@ public class HistoryFileManager extends this.aclsMgr = new JobACLsManager(conf); - jobListCacheSize = conf.getInt(JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, - JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE); + maxHistoryAge = conf.getLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, + JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); + + jobListCache = new JobListCache(conf.getInt( + JHAdminConfig.MR_HISTORY_JOBLIST_CACHE_SIZE, + JHAdminConfig.DEFAULT_MR_HISTORY_JOBLIST_CACHE_SIZE), + maxHistoryAge); - dateStringCacheSize = conf.getInt( + serialNumberIndex = new SerialNumberIndex(conf.getInt( JHAdminConfig.MR_HISTORY_DATESTRING_CACHE_SIZE, - JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE); + JHAdminConfig.DEFAULT_MR_HISTORY_DATESTRING_CACHE_SIZE)); + + int numMoveThreads = conf.getInt( + JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, + JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); + ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( + "MoveIntermediateToDone Thread #%d").build(); + moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads, + 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); super.init(conf); } @@ -249,6 +468,7 @@ public class HistoryFileManager extends void initExisting() throws IOException { LOG.info("Initializing Existing Jobs..."); List timestampedDirList = findTimestampedDirectories(); + // Sort first just so insertion is in a consistent order Collections.sort(timestampedDirList); for (FileStatus fs : timestampedDirList) { // TODO Could verify the correct format for these directories. @@ -271,16 +491,7 @@ public class HistoryFileManager extends + serialDirPath.toString() + ". Continuing with next"); return; } - synchronized (idToDateString) { - // TODO make this thread safe without the synchronize - if (idToDateString.containsKey(serialPart)) { - Set set = idToDateString.get(serialPart); - set.remove(timeStampPart); - if (set.isEmpty()) { - idToDateString.remove(serialPart); - } - } - } + serialNumberIndex.remove(serialPart, timeStampPart); } private void addDirectoryToSerialNumberIndex(Path serialDirPath) { @@ -299,21 +510,7 @@ public class HistoryFileManager extends LOG.warn("Could not find serial portion from path: " + serialDirPath.toString() + ". Continuing with next"); } - addToSerialNumberIndex(serialPart, timestampPart); - } - - private void addToSerialNumberIndex(String serialPart, String timestampPart) { - synchronized (idToDateString) { - // TODO make this thread safe without the synchronize - if (!idToDateString.containsKey(serialPart)) { - idToDateString.put(serialPart, new HashSet()); - if (idToDateString.size() > dateStringCacheSize) { - idToDateString.remove(idToDateString.firstKey()); - } - Set datePartSet = idToDateString.get(serialPart); - datePartSet.add(timestampPart); - } - } + serialNumberIndex.add(serialPart, timestampPart); } private void addDirectoryToJobListCache(Path path) throws IOException { @@ -332,10 +529,10 @@ public class HistoryFileManager extends .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); - MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() - .getParent(), confFileName), new Path(fs.getPath().getParent(), - summaryFileName), jobIndexInfo); - addToJobListCache(metaInfo); + HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs + .getPath().getParent(), confFileName), new Path(fs.getPath() + .getParent(), summaryFileName), jobIndexInfo, true); + jobListCache.addIfAbsent(fileInfo); } } @@ -371,25 +568,18 @@ public class HistoryFileManager extends return fsList; } - private void addToJobListCache(MetaInfo metaInfo) { - JobId jobId = metaInfo.getJobIndexInfo().getJobId(); - if (LOG.isDebugEnabled()) { - LOG.debug("Adding " + jobId + " to job list cache with " - + metaInfo.getJobIndexInfo()); - } - jobListCache.put(jobId, metaInfo); - if (jobListCache.size() > jobListCacheSize) { - jobListCache.remove(jobListCache.firstKey()); - } - } - /** * Scans the intermediate directory to find user directories. Scans these for - * history files if the modification time for the directory has changed. + * history files if the modification time for the directory has changed. Once + * it finds history files it starts the process of moving them to the done + * directory. * * @throws IOException + * if there was a error while scanning */ - private void scanIntermediateDirectory() throws IOException { + void scanIntermediateDirectory() throws IOException { + // TODO it would be great to limit how often this happens, except in the + // case where we are looking for a particular job. List userDirList = JobHistoryUtils.localGlobber( intermediateDoneDirFc, intermediateDoneDirPath, ""); @@ -405,7 +595,12 @@ public class HistoryFileManager extends } } if (shouldScan) { - scanIntermediateDirectory(userDir.getPath()); + try { + scanIntermediateDirectory(userDir.getPath()); + } catch (IOException e) { + LOG.error("Error while trying to scan the directory " + + userDir.getPath(), e); + } } } } @@ -426,11 +621,33 @@ public class HistoryFileManager extends .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); - MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() - .getParent(), confFileName), new Path(fs.getPath().getParent(), - summaryFileName), jobIndexInfo); - if (!intermediateListCache.containsKey(jobIndexInfo.getJobId())) { - intermediateListCache.put(jobIndexInfo.getJobId(), metaInfo); + HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs + .getPath().getParent(), confFileName), new Path(fs.getPath() + .getParent(), summaryFileName), jobIndexInfo, false); + + final HistoryFileInfo old = jobListCache.addIfAbsent(fileInfo); + if (old == null || old.didMoveFail()) { + final HistoryFileInfo found = (old == null) ? fileInfo : old; + long cutoff = System.currentTimeMillis() - maxHistoryAge; + if(found.getJobIndexInfo().getFinishTime() <= cutoff) { + try { + found.delete(); + } catch (IOException e) { + LOG.warn("Error cleaning up a HistoryFile that is out of date.", e); + } + } else { + moveToDoneExecutor.execute(new Runnable() { + @Override + public void run() { + try { + found.moveToDone(); + } catch (IOException e) { + LOG.info("Failed to process fileInfo for job: " + + found.getJobId(), e); + } + } + }); + } } } } @@ -442,11 +659,11 @@ public class HistoryFileManager extends * fileStatus list of Job History Files. * @param jobId * The JobId to find. - * @return A MetaInfo object for the jobId, null if not found. + * @return A FileInfo object for the jobId, null if not found. * @throws IOException */ - private MetaInfo getJobMetaInfo(List fileStatusList, JobId jobId) - throws IOException { + private HistoryFileInfo getJobFileInfo(List fileStatusList, + JobId jobId) throws IOException { for (FileStatus fs : fileStatusList) { JobIndexInfo jobIndexInfo = FileNameIndexUtils.getIndexInfo(fs.getPath() .getName()); @@ -455,10 +672,10 @@ public class HistoryFileManager extends .getIntermediateConfFileName(jobIndexInfo.getJobId()); String summaryFileName = JobHistoryUtils .getIntermediateSummaryFileName(jobIndexInfo.getJobId()); - MetaInfo metaInfo = new MetaInfo(fs.getPath(), new Path(fs.getPath() - .getParent(), confFileName), new Path(fs.getPath().getParent(), - summaryFileName), jobIndexInfo); - return metaInfo; + HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path( + fs.getPath().getParent(), confFileName), new Path(fs.getPath() + .getParent(), summaryFileName), jobIndexInfo, true); + return fileInfo; } } return null; @@ -474,175 +691,51 @@ public class HistoryFileManager extends * @return * @throws IOException */ - private MetaInfo scanOldDirsForJob(JobId jobId) throws IOException { + private HistoryFileInfo scanOldDirsForJob(JobId jobId) throws IOException { int jobSerialNumber = JobHistoryUtils.jobSerialNumber(jobId); String boxedSerialNumber = String.valueOf(jobSerialNumber); - Set dateStringSet; - synchronized (idToDateString) { - Set found = idToDateString.get(boxedSerialNumber); - if (found == null) { - return null; - } else { - dateStringSet = new HashSet(found); - } + Set dateStringSet = serialNumberIndex.get(boxedSerialNumber); + if (dateStringSet == null) { + return null; } for (String timestampPart : dateStringSet) { Path logDir = canonicalHistoryLogPath(jobId, timestampPart); List fileStatusList = scanDirectoryForHistoryFiles(logDir, doneDirFc); - MetaInfo metaInfo = getJobMetaInfo(fileStatusList, jobId); - if (metaInfo != null) { - return metaInfo; + HistoryFileInfo fileInfo = getJobFileInfo(fileStatusList, jobId); + if (fileInfo != null) { + return fileInfo; } } return null; } - /** - * Checks for the existence of the job history file in the intermediate - * directory. - * - * @param jobId - * @return - * @throws IOException - */ - private MetaInfo scanIntermediateForJob(JobId jobId) throws IOException { - scanIntermediateDirectory(); - return intermediateListCache.get(jobId); - } - - /** - * Parse a job from the JobHistoryFile, if the underlying file is not going to - * be deleted. - * - * @param metaInfo - * the where the JobHistory is stored. - * @return the Job or null if the underlying file was deleted. - * @throws IOException - * if there is an error trying to read the file. - */ - public Job loadJob(MetaInfo metaInfo) throws IOException { - return new CompletedJob(conf, metaInfo.getJobIndexInfo().getJobId(), - metaInfo.getHistoryFile(), false, metaInfo.getJobIndexInfo().getUser(), - metaInfo.getConfFile(), aclsMgr); - } - - public Collection getAllMetaInfo() throws IOException { + public Collection getAllFileInfo() throws IOException { scanIntermediateDirectory(); - ArrayList result = new ArrayList(); - result.addAll(intermediateListCache.values()); - result.addAll(jobListCache.values()); - return result; + return jobListCache.values(); } - Collection getIntermediateMetaInfos() throws IOException { - scanIntermediateDirectory(); - return intermediateListCache.values(); - } - - public MetaInfo getMetaInfo(JobId jobId) throws IOException { - // MetaInfo available in cache. - MetaInfo metaInfo = null; - if (jobListCache.containsKey(jobId)) { - metaInfo = jobListCache.get(jobId); - } - - if (metaInfo != null) { - return metaInfo; + public HistoryFileInfo getFileInfo(JobId jobId) throws IOException { + // FileInfo available in cache. + HistoryFileInfo fileInfo = jobListCache.get(jobId); + if (fileInfo != null) { + return fileInfo; } - - // MetaInfo not available. Check intermediate directory for meta info. - metaInfo = scanIntermediateForJob(jobId); - if (metaInfo != null) { - return metaInfo; + // OK so scan the intermediate to be sure we did not lose it that way + scanIntermediateDirectory(); + fileInfo = jobListCache.get(jobId); + if (fileInfo != null) { + return fileInfo; } // Intermediate directory does not contain job. Search through older ones. - metaInfo = scanOldDirsForJob(jobId); - if (metaInfo != null) { - return metaInfo; + fileInfo = scanOldDirsForJob(jobId); + if (fileInfo != null) { + return fileInfo; } return null; } - void moveToDone(MetaInfo metaInfo) throws IOException { - long completeTime = metaInfo.getJobIndexInfo().getFinishTime(); - if (completeTime == 0) - completeTime = System.currentTimeMillis(); - JobId jobId = metaInfo.getJobIndexInfo().getJobId(); - - List paths = new ArrayList(); - Path historyFile = metaInfo.getHistoryFile(); - if (historyFile == null) { - LOG.info("No file for job-history with " + jobId + " found in cache!"); - } else { - paths.add(historyFile); - } - - Path confFile = metaInfo.getConfFile(); - if (confFile == null) { - LOG.info("No file for jobConf with " + jobId + " found in cache!"); - } else { - paths.add(confFile); - } - - // TODO Check all mi getters and setters for the conf path - Path summaryFile = metaInfo.getSummaryFile(); - if (summaryFile == null) { - LOG.info("No summary file for job: " + jobId); - } else { - try { - String jobSummaryString = getJobSummary(intermediateDoneDirFc, - summaryFile); - SUMMARY_LOG.info(jobSummaryString); - LOG.info("Deleting JobSummary file: [" + summaryFile + "]"); - intermediateDoneDirFc.delete(summaryFile, false); - metaInfo.setSummaryFile(null); - } catch (IOException e) { - LOG.warn("Failed to process summary file: [" + summaryFile + "]"); - throw e; - } - } - - Path targetDir = canonicalHistoryLogPath(jobId, completeTime); - addDirectoryToSerialNumberIndex(targetDir); - try { - makeDoneSubdir(targetDir); - } catch (IOException e) { - LOG.warn("Failed creating subdirectory: " + targetDir - + " while attempting to move files for jobId: " + jobId); - throw e; - } - synchronized (metaInfo) { - if (historyFile != null) { - Path toPath = doneDirFc.makeQualified(new Path(targetDir, historyFile - .getName())); - try { - moveToDoneNow(historyFile, toPath); - } catch (IOException e) { - LOG.warn("Failed to move file: " + historyFile + " for jobId: " - + jobId); - throw e; - } - metaInfo.setHistoryFile(toPath); - } - if (confFile != null) { - Path toPath = doneDirFc.makeQualified(new Path(targetDir, confFile - .getName())); - try { - moveToDoneNow(confFile, toPath); - } catch (IOException e) { - LOG.warn("Failed to move file: " + historyFile + " for jobId: " - + jobId); - throw e; - } - metaInfo.setConfFile(toPath); - } - } - addToJobListCache(metaInfo); - intermediateListCache.remove(jobId); - } - private void moveToDoneNow(final Path src, final Path target) throws IOException { LOG.info("Moving " + src.toString() + " to " + target.toString()); @@ -658,20 +751,9 @@ public class HistoryFileManager extends } private void makeDoneSubdir(Path path) throws IOException { - boolean existsInExistingCache = false; - synchronized (existingDoneSubdirs) { - if (existingDoneSubdirs.contains(path)) - existsInExistingCache = true; - } try { doneDirFc.getFileStatus(path); - if (!existsInExistingCache) { - existingDoneSubdirs.add(path); - if (LOG.isDebugEnabled()) { - LOG.debug("JobHistory.maybeMakeSubdirectory -- We believed " + path - + " already existed, but it didn't."); - } - } + existingDoneSubdirs.add(path); } catch (FileNotFoundException fnfE) { try { FsPermission fsp = new FsPermission( @@ -685,11 +767,8 @@ public class HistoryFileManager extends + ", " + fsp); doneDirFc.setPermission(path, fsp); } - synchronized (existingDoneSubdirs) { - existingDoneSubdirs.add(path); - } - } catch (FileAlreadyExistsException faeE) { - // Nothing to do. + existingDoneSubdirs.add(path); + } catch (FileAlreadyExistsException faeE) { // Nothing to do. } } } @@ -713,16 +792,22 @@ public class HistoryFileManager extends return finishTime; } - private void deleteJobFromDone(MetaInfo metaInfo) throws IOException { - jobListCache.remove(metaInfo.getJobId()); - doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getHistoryFile()), false); - doneDirFc.delete(doneDirFc.makeQualified(metaInfo.getConfFile()), false); + private void deleteJobFromDone(HistoryFileInfo fileInfo) throws IOException { + jobListCache.delete(fileInfo); + fileInfo.delete(); } + /** + * Clean up older history files. + * + * @throws IOException + * on any error trying to remove the entries. + */ @SuppressWarnings("unchecked") - void clean(long cutoff, HistoryStorage storage) throws IOException { + void clean() throws IOException { // TODO this should be replaced by something that knows about the directory // structure and will put less of a load on HDFS. + long cutoff = System.currentTimeMillis() - maxHistoryAge; boolean halted = false; // TODO Delete YYYY/MM/DD directories. List serialDirList = findTimestampedDirectories(); @@ -737,13 +822,17 @@ public class HistoryFileManager extends long effectiveTimestamp = getEffectiveTimestamp( jobIndexInfo.getFinishTime(), historyFile); if (effectiveTimestamp <= cutoff) { - String confFileName = JobHistoryUtils - .getIntermediateConfFileName(jobIndexInfo.getJobId()); - MetaInfo metaInfo = new MetaInfo(historyFile.getPath(), new Path( - historyFile.getPath().getParent(), confFileName), null, - jobIndexInfo); - storage.jobRemovedFromHDFS(metaInfo.getJobId()); - deleteJobFromDone(metaInfo); + HistoryFileInfo fileInfo = this.jobListCache.get(jobIndexInfo + .getJobId()); + if (fileInfo == null) { + String confFileName = JobHistoryUtils + .getIntermediateConfFileName(jobIndexInfo.getJobId()); + + fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path( + historyFile.getPath().getParent(), confFileName), null, + jobIndexInfo, true); + } + deleteJobFromDone(fileInfo); } else { halted = true; break; @@ -752,9 +841,7 @@ public class HistoryFileManager extends if (!halted) { doneDirFc.delete(doneDirFc.makeQualified(serialDir.getPath()), true); removeDirectoryFromSerialNumberIndex(serialDir.getPath()); - synchronized (existingDoneSubdirs) { - existingDoneSubdirs.remove(serialDir.getPath()); - } + existingDoneSubdirs.remove(serialDir.getPath()); } else { break; // Don't scan any more directories. } Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryStorage.java Wed Apr 18 23:35:30 2012 @@ -28,7 +28,12 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability; /** - * Provides an API to query jobs that have finished. + * Provides an API to query jobs that have finished. + * + * For those implementing this API be aware that there is no feedback when + * files are removed from HDFS. You may rely on HistoryFileManager to help + * you know when that has happened if you have not made a complete backup of + * the data stored on HDFS. */ @InterfaceAudience.Public @InterfaceStability.Unstable @@ -71,10 +76,4 @@ public interface HistoryStorage { * @return the job, or null if it is not found. */ Job getFullJob(JobId jobId); - - /** - * Informs the Storage that a job has been removed from HDFS - * @param jobId the ID of the job that was removed. - */ - void jobRemovedFromHDFS(JobId jobId); } Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Wed Apr 18 23:35:30 2012 @@ -21,10 +21,7 @@ package org.apache.hadoop.mapreduce.v2.h import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -37,7 +34,7 @@ import org.apache.hadoop.mapreduce.TypeC import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.job.Job; -import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.MetaInfo; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.webapp.dao.JobsInfo; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.util.ReflectionUtils; @@ -66,15 +63,9 @@ public class JobHistory extends Abstract // Time interval for the move thread. private long moveThreadInterval; - // Number of move threads. - private int numMoveThreads; - private Configuration conf; - private Thread moveIntermediateToDoneThread = null; - private MoveIntermediateToDoneRunnable moveIntermediateToDoneRunnable = null; - - private ScheduledThreadPoolExecutor cleanerScheduledExecutor = null; + private ScheduledThreadPoolExecutor scheduledExecutor = null; private HistoryStorage storage = null; private HistoryFileManager hsManager = null; @@ -91,8 +82,6 @@ public class JobHistory extends Abstract moveThreadInterval = conf.getLong( JHAdminConfig.MR_HISTORY_MOVE_INTERVAL_MS, JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_INTERVAL_MS); - numMoveThreads = conf.getInt(JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT, - JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); hsManager = new HistoryFileManager(); hsManager.init(conf); @@ -120,27 +109,22 @@ public class JobHistory extends Abstract ((Service) storage).start(); } - // Start moveIntermediatToDoneThread - moveIntermediateToDoneRunnable = new MoveIntermediateToDoneRunnable( - moveThreadInterval, numMoveThreads); - moveIntermediateToDoneThread = new Thread(moveIntermediateToDoneRunnable); - moveIntermediateToDoneThread.setName("MoveIntermediateToDoneScanner"); - moveIntermediateToDoneThread.start(); + scheduledExecutor = new ScheduledThreadPoolExecutor(2, + new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d") + .build()); + + scheduledExecutor.scheduleAtFixedRate(new MoveIntermediateToDoneRunnable(), + moveThreadInterval, moveThreadInterval, TimeUnit.MILLISECONDS); // Start historyCleaner boolean startCleanerService = conf.getBoolean( JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, true); if (startCleanerService) { - long maxAgeOfHistoryFiles = conf.getLong( - JHAdminConfig.MR_HISTORY_MAX_AGE_MS, - JHAdminConfig.DEFAULT_MR_HISTORY_MAX_AGE); - cleanerScheduledExecutor = new ScheduledThreadPoolExecutor(1, - new ThreadFactoryBuilder().setNameFormat("LogCleaner").build()); long runInterval = conf.getLong( JHAdminConfig.MR_HISTORY_CLEANER_INTERVAL_MS, JHAdminConfig.DEFAULT_MR_HISTORY_CLEANER_INTERVAL_MS); - cleanerScheduledExecutor - .scheduleAtFixedRate(new HistoryCleaner(maxAgeOfHistoryFiles), + scheduledExecutor + .scheduleAtFixedRate(new HistoryCleaner(), 30 * 1000l, runInterval, TimeUnit.MILLISECONDS); } super.start(); @@ -149,24 +133,12 @@ public class JobHistory extends Abstract @Override public void stop() { LOG.info("Stopping JobHistory"); - if (moveIntermediateToDoneThread != null) { - LOG.info("Stopping move thread"); - moveIntermediateToDoneRunnable.stop(); - moveIntermediateToDoneThread.interrupt(); - try { - LOG.info("Joining on move thread"); - moveIntermediateToDoneThread.join(); - } catch (InterruptedException e) { - LOG.info("Interrupted while stopping move thread"); - } - } - - if (cleanerScheduledExecutor != null) { - LOG.info("Stopping History Cleaner"); - cleanerScheduledExecutor.shutdown(); + if (scheduledExecutor != null) { + LOG.info("Stopping History Cleaner/Move To Done"); + scheduledExecutor.shutdown(); boolean interrupted = false; long currentTime = System.currentTimeMillis(); - while (!cleanerScheduledExecutor.isShutdown() + while (!scheduledExecutor.isShutdown() && System.currentTimeMillis() > currentTime + 1000l && !interrupted) { try { Thread.sleep(20); @@ -174,8 +146,10 @@ public class JobHistory extends Abstract interrupted = true; } } - if (!cleanerScheduledExecutor.isShutdown()) { - LOG.warn("HistoryCleanerService shutdown may not have succeeded"); + if (!scheduledExecutor.isShutdown()) { + LOG.warn("HistoryCleanerService/move to done shutdown may not have " + + "succeeded, Forcing a shutdown"); + scheduledExecutor.shutdownNow(); } } if (storage instanceof Service) { @@ -195,68 +169,34 @@ public class JobHistory extends Abstract } private class MoveIntermediateToDoneRunnable implements Runnable { - - private long sleepTime; - private ThreadPoolExecutor moveToDoneExecutor = null; - private boolean running = false; - - public synchronized void stop() { - running = false; - notify(); - } - - MoveIntermediateToDoneRunnable(long sleepTime, int numMoveThreads) { - this.sleepTime = sleepTime; - ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( - "MoveIntermediateToDone Thread #%d").build(); - moveToDoneExecutor = new ThreadPoolExecutor(1, numMoveThreads, 1, - TimeUnit.HOURS, new LinkedBlockingQueue(), tf); - running = true; - } - @Override public void run() { - Thread.currentThread().setName("IntermediateHistoryScanner"); try { - while (true) { - LOG.info("Starting scan to move intermediate done files"); - for (final MetaInfo metaInfo : hsManager.getIntermediateMetaInfos()) { - moveToDoneExecutor.execute(new Runnable() { - @Override - public void run() { - try { - hsManager.moveToDone(metaInfo); - } catch (IOException e) { - LOG.info( - "Failed to process metaInfo for job: " - + metaInfo.getJobId(), e); - } - } - }); - } - synchronized (this) { - try { - this.wait(sleepTime); - } catch (InterruptedException e) { - LOG.info("IntermediateHistoryScannerThread interrupted"); - } - if (!running) { - break; - } - } - } + LOG.info("Starting scan to move intermediate done files"); + hsManager.scanIntermediateDirectory(); } catch (IOException e) { - LOG.warn("Unable to get a list of intermediate files to be moved"); - // TODO Shut down the entire process!!!! + LOG.error("Error while scanning intermediate done dir ", e); } } } + + private class HistoryCleaner implements Runnable { + public void run() { + LOG.info("History Cleaner started"); + try { + hsManager.clean(); + } catch (IOException e) { + LOG.warn("Error trying to clean up ", e); + } + LOG.info("History Cleaner complete"); + } + } /** * Helper method for test cases. */ - MetaInfo getJobMetaInfo(JobId jobId) throws IOException { - return hsManager.getMetaInfo(jobId); + HistoryFileInfo getJobFileInfo(JobId jobId) throws IOException { + return hsManager.getFileInfo(jobId); } @Override @@ -313,25 +253,6 @@ public class JobHistory extends Abstract fBegin, fEnd, jobState); } - public class HistoryCleaner implements Runnable { - long maxAgeMillis; - - public HistoryCleaner(long maxAge) { - this.maxAgeMillis = maxAge; - } - - public void run() { - LOG.info("History Cleaner started"); - long cutoff = System.currentTimeMillis() - maxAgeMillis; - try { - hsManager.clean(cutoff, storage); - } catch (IOException e) { - LOG.warn("Error trying to clean up ", e); - } - LOG.info("History Cleaner complete"); - } - } - // TODO AppContext - Not Required private ApplicationAttemptId appAttemptID; Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Wed Apr 18 23:35:30 2012 @@ -21,6 +21,9 @@ package org.apache.hadoop.mapreduce.v2.h import java.util.List; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.JobACL; @@ -37,9 +40,9 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import clover.org.apache.log4j.Logger; public class PartialJob implements org.apache.hadoop.mapreduce.v2.app.job.Job { + private static final Log LOG = LogFactory.getLog(PartialJob.class); private JobIndexInfo jobIndexInfo = null; private JobId jobId = null; @@ -78,8 +81,7 @@ public class PartialJob implements org.a } catch (Exception e) { // Meant for use by the display UI. Exception would prevent it from being // rendered.e Defaulting to KILLED - Logger.getLogger(this.getClass().getName()).warn( - "Exception while parsing job state. Defaulting to KILLED", e); + LOG.warn("Exception while parsing job state. Defaulting to KILLED", e); js = JobState.KILLED; } return js; @@ -165,6 +167,11 @@ public class PartialJob implements org.a public Path getConfFile() { throw new IllegalStateException("Not implemented yet"); } + + @Override + public Configuration loadConfFile() { + throw new IllegalStateException("Not implemented yet"); + } @Override public Map getJobACLs() { Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java Wed Apr 18 23:35:30 2012 @@ -65,7 +65,6 @@ import com.google.inject.Inject; public class HsWebServices { private final HistoryContext ctx; private WebApp webapp; - private final Configuration conf; @Context UriInfo uriInfo; @@ -74,7 +73,6 @@ public class HsWebServices { public HsWebServices(final HistoryContext ctx, final Configuration conf, final WebApp webapp) { this.ctx = ctx; - this.conf = conf; this.webapp = webapp; } @@ -222,7 +220,7 @@ public class HsWebServices { Job job = AMWebServices.getJobFromJobIdString(jid, ctx); ConfInfo info; try { - info = new ConfInfo(job, this.conf); + info = new ConfInfo(job); } catch (IOException e) { throw new NotFoundException("unable to load configuration for job: " + jid); Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEntities.java Wed Apr 18 23:35:30 2012 @@ -22,12 +22,15 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; +import static org.mockito.Mockito.*; + @RunWith(value = Parameterized.class) public class TestJobHistoryEntities { @@ -61,10 +64,12 @@ public class TestJobHistoryEntities { /* Verify some expected values based on the history file */ @Test public void testCompletedJob() throws Exception { + HistoryFileInfo info = mock(HistoryFileInfo.class); + when(info.getConfFile()).thenReturn(fullConfPath); //Re-initialize to verify the delayed load. completedJob = new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user", - fullConfPath, jobAclsManager); + info, jobAclsManager); //Verify tasks loaded based on loadTask parameter. assertEquals(loadTasks, completedJob.tasksLoaded.get()); assertEquals(1, completedJob.getAMInfos().size()); @@ -84,9 +89,11 @@ public class TestJobHistoryEntities { @Test public void testCompletedTask() throws Exception { + HistoryFileInfo info = mock(HistoryFileInfo.class); + when(info.getConfFile()).thenReturn(fullConfPath); completedJob = new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user", - fullConfPath, jobAclsManager); + info, jobAclsManager); TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); @@ -111,9 +118,11 @@ public class TestJobHistoryEntities { @Test public void testCompletedTaskAttempt() throws Exception { + HistoryFileInfo info = mock(HistoryFileInfo.class); + when(info.getConfFile()).thenReturn(fullConfPath); completedJob = new CompletedJob(conf, jobId, fulleHistoryPath, loadTasks, "user", - fullConfPath, jobAclsManager); + info, jobAclsManager); TaskId mt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP); TaskId rt1Id = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE); TaskAttemptId mta1Id = MRBuilderUtils.newTaskAttemptId(mt1Id, 0); Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Wed Apr 18 23:35:30 2012 @@ -56,6 +56,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo; import org.apache.hadoop.mapreduce.v2.hs.TestJobHistoryEvents.MRAppWithHistory; import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils; import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; @@ -84,12 +85,22 @@ public class TestJobHistoryParsing { @Test public void testHistoryParsing() throws Exception { - checkHistoryParsing(2, 1, 2); + LOG.info("STARTING testHistoryParsing()"); + try { + checkHistoryParsing(2, 1, 2); + } finally { + LOG.info("FINISHED testHistoryParsing()"); + } } @Test public void testHistoryParsingWithParseErrors() throws Exception { - checkHistoryParsing(3, 0, 2); + LOG.info("STARTING testHistoryParsingWithParseErrors()"); + try { + checkHistoryParsing(3, 0, 2); + } finally { + LOG.info("FINISHED testHistoryParsingWithParseErrors()"); + } } private static String getJobSummary(FileContext fc, Path path) throws IOException { @@ -124,61 +135,112 @@ public class TestJobHistoryParsing { String jobhistoryDir = JobHistoryUtils .getHistoryIntermediateDoneDirForUser(conf); - JobHistory jobHistory = new JobHistory(); - jobHistory.init(conf); - - JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId) - .getJobIndexInfo(); - String jobhistoryFileName = FileNameIndexUtils - .getDoneFileName(jobIndexInfo); - - Path historyFilePath = new Path(jobhistoryDir, jobhistoryFileName); - FSDataInputStream in = null; - LOG.info("JobHistoryFile is: " + historyFilePath); + FileContext fc = null; try { fc = FileContext.getFileContext(conf); - in = fc.open(fc.makeQualified(historyFilePath)); } catch (IOException ioe) { - LOG.info("Can not open history file: " + historyFilePath, ioe); - throw (new Exception("Can not open History File")); + LOG.info("Can not get FileContext", ioe); + throw (new Exception("Can not get File Context")); } - - JobHistoryParser parser = new JobHistoryParser(in); - final EventReader realReader = new EventReader(in); - EventReader reader = Mockito.mock(EventReader.class); + if (numMaps == numSuccessfulMaps) { - reader = realReader; - } else { - final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! - Mockito.when(reader.getNextEvent()).thenAnswer( - new Answer() { - public HistoryEvent answer(InvocationOnMock invocation) - throws IOException { - HistoryEvent event = realReader.getNextEvent(); - if (event instanceof TaskFinishedEvent) { - numFinishedEvents.incrementAndGet(); - } - - if (numFinishedEvents.get() <= numSuccessfulMaps) { - return event; - } else { - throw new IOException("test"); + String summaryFileName = JobHistoryUtils + .getIntermediateSummaryFileName(jobId); + Path summaryFile = new Path(jobhistoryDir, summaryFileName); + String jobSummaryString = getJobSummary(fc, summaryFile); + Assert.assertNotNull(jobSummaryString); + Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); + Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); + + Map jobSummaryElements = new HashMap(); + StringTokenizer strToken = new StringTokenizer(jobSummaryString, ","); + while (strToken.hasMoreTokens()) { + String keypair = strToken.nextToken(); + jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]); + } + + Assert.assertEquals("JobId does not match", jobId.toString(), + jobSummaryElements.get("jobId")); + Assert.assertEquals("JobName does not match", "test", + jobSummaryElements.get("jobName")); + Assert.assertTrue("submitTime should not be 0", + Long.parseLong(jobSummaryElements.get("submitTime")) != 0); + Assert.assertTrue("launchTime should not be 0", + Long.parseLong(jobSummaryElements.get("launchTime")) != 0); + Assert.assertTrue("firstMapTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); + Assert + .assertTrue( + "firstReduceTaskLaunchTime should not be 0", + Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); + Assert.assertTrue("finishTime should not be 0", + Long.parseLong(jobSummaryElements.get("finishTime")) != 0); + Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps, + Integer.parseInt(jobSummaryElements.get("numMaps"))); + Assert.assertEquals("Mismatch in num reduce slots", numReduces, + Integer.parseInt(jobSummaryElements.get("numReduces"))); + Assert.assertEquals("User does not match", System.getProperty("user.name"), + jobSummaryElements.get("user")); + Assert.assertEquals("Queue does not match", "default", + jobSummaryElements.get("queue")); + Assert.assertEquals("Status does not match", "SUCCEEDED", + jobSummaryElements.get("status")); + } + + JobHistory jobHistory = new JobHistory(); + jobHistory.init(conf); + HistoryFileInfo fileInfo = jobHistory.getJobFileInfo(jobId); + JobInfo jobInfo; + long numFinishedMaps; + + synchronized(fileInfo) { + Path historyFilePath = fileInfo.getHistoryFile(); + FSDataInputStream in = null; + LOG.info("JobHistoryFile is: " + historyFilePath); + try { + in = fc.open(fc.makeQualified(historyFilePath)); + } catch (IOException ioe) { + LOG.info("Can not open history file: " + historyFilePath, ioe); + throw (new Exception("Can not open History File")); + } + + JobHistoryParser parser = new JobHistoryParser(in); + final EventReader realReader = new EventReader(in); + EventReader reader = Mockito.mock(EventReader.class); + if (numMaps == numSuccessfulMaps) { + reader = realReader; + } else { + final AtomicInteger numFinishedEvents = new AtomicInteger(0); // Hack! + Mockito.when(reader.getNextEvent()).thenAnswer( + new Answer() { + public HistoryEvent answer(InvocationOnMock invocation) + throws IOException { + HistoryEvent event = realReader.getNextEvent(); + if (event instanceof TaskFinishedEvent) { + numFinishedEvents.incrementAndGet(); + } + + if (numFinishedEvents.get() <= numSuccessfulMaps) { + return event; + } else { + throw new IOException("test"); + } } } - } ); - } - - JobInfo jobInfo = parser.parse(reader); - - long numFinishedMaps = + } + + jobInfo = parser.parse(reader); + + numFinishedMaps = computeFinishedMaps(jobInfo, numMaps, numSuccessfulMaps); - - if (numFinishedMaps != numMaps) { - Exception parseException = parser.getParseException(); - Assert.assertNotNull("Didn't get expected parse exception", - parseException); + + if (numFinishedMaps != numMaps) { + Exception parseException = parser.getParseException(); + Assert.assertNotNull("Didn't get expected parse exception", + parseException); + } } Assert.assertEquals("Incorrect username ", System.getProperty("user.name"), @@ -246,52 +308,6 @@ public class TestJobHistoryParsing { } } } - - if (numMaps == numSuccessfulMaps) { - - String summaryFileName = JobHistoryUtils - .getIntermediateSummaryFileName(jobId); - Path summaryFile = new Path(jobhistoryDir, summaryFileName); - String jobSummaryString = getJobSummary(fc, summaryFile); - Assert.assertTrue(jobSummaryString.contains("resourcesPerMap=100")); - Assert.assertTrue(jobSummaryString.contains("resourcesPerReduce=100")); - Assert.assertNotNull(jobSummaryString); - - Map jobSummaryElements = new HashMap(); - StringTokenizer strToken = new StringTokenizer(jobSummaryString, ","); - while (strToken.hasMoreTokens()) { - String keypair = strToken.nextToken(); - jobSummaryElements.put(keypair.split("=")[0], keypair.split("=")[1]); - - } - - Assert.assertEquals("JobId does not match", jobId.toString(), - jobSummaryElements.get("jobId")); - Assert.assertEquals("JobName does not match", "test", - jobSummaryElements.get("jobName")); - Assert.assertTrue("submitTime should not be 0", - Long.parseLong(jobSummaryElements.get("submitTime")) != 0); - Assert.assertTrue("launchTime should not be 0", - Long.parseLong(jobSummaryElements.get("launchTime")) != 0); - Assert.assertTrue("firstMapTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstMapTaskLaunchTime")) != 0); - Assert - .assertTrue( - "firstReduceTaskLaunchTime should not be 0", - Long.parseLong(jobSummaryElements.get("firstReduceTaskLaunchTime")) != 0); - Assert.assertTrue("finishTime should not be 0", - Long.parseLong(jobSummaryElements.get("finishTime")) != 0); - Assert.assertEquals("Mismatch in num map slots", numSuccessfulMaps, - Integer.parseInt(jobSummaryElements.get("numMaps"))); - Assert.assertEquals("Mismatch in num reduce slots", numReduces, - Integer.parseInt(jobSummaryElements.get("numReduces"))); - Assert.assertEquals("User does not match", System.getProperty("user.name"), - jobSummaryElements.get("user")); - Assert.assertEquals("Queue does not match", "default", - jobSummaryElements.get("queue")); - Assert.assertEquals("Status does not match", "SUCCEEDED", - jobSummaryElements.get("status")); - } } // Computes finished maps similar to RecoveryService... @@ -314,6 +330,8 @@ public class TestJobHistoryParsing { @Test public void testHistoryParsingForFailedAttempts() throws Exception { + LOG.info("STARTING testHistoryParsingForFailedAttempts"); + try { Configuration conf = new Configuration(); conf .setClass( @@ -335,7 +353,7 @@ public class TestJobHistoryParsing { JobHistory jobHistory = new JobHistory(); jobHistory.init(conf); - JobIndexInfo jobIndexInfo = jobHistory.getJobMetaInfo(jobId) + JobIndexInfo jobIndexInfo = jobHistory.getJobFileInfo(jobId) .getJobIndexInfo(); String jobhistoryFileName = FileNameIndexUtils .getDoneFileName(jobIndexInfo); @@ -372,6 +390,9 @@ public class TestJobHistoryParsing { } } Assert.assertEquals("No of Failed tasks doesn't match.", 2, noOffailedAttempts); + } finally { + LOG.info("FINISHED testHistoryParsingForFailedAttempts"); + } } static class MRAppWithHistoryWithFailedAttempt extends MRAppWithHistory { Modified: hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1327724&r1=1327723&r2=1327724&view=diff ============================================================================== --- hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original) +++ hadoop/common/branches/HDFS-3042/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Wed Apr 18 23:35:30 2012 @@ -40,7 +40,6 @@ import org.apache.hadoop.mapreduce.secur import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.mapreduce.v2.util.MRApps; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ClientRMProtocol; @@ -88,12 +87,10 @@ public class ResourceMgrDelegate { public ResourceMgrDelegate(YarnConfiguration conf) { this.conf = conf; YarnRPC rpc = YarnRPC.create(this.conf); - InetSocketAddress rmAddress = - NetUtils.createSocketAddr(this.conf.get( + InetSocketAddress rmAddress = conf.getSocketAddr( YarnConfiguration.RM_ADDRESS, - YarnConfiguration.DEFAULT_RM_ADDRESS), - YarnConfiguration.DEFAULT_RM_PORT, - YarnConfiguration.RM_ADDRESS); + YarnConfiguration.DEFAULT_RM_ADDRESS, + YarnConfiguration.DEFAULT_RM_PORT); this.rmAddress = rmAddress.toString(); LOG.debug("Connecting to ResourceManager at " + rmAddress); applicationsManager =