From mapreduce-commits-return-280-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Thu Sep 17 05:05:10 2009 Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 90443 invoked from network); 17 Sep 2009 05:05:10 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 17 Sep 2009 05:05:10 -0000 Received: (qmail 94374 invoked by uid 500); 17 Sep 2009 05:05:10 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 94286 invoked by uid 500); 17 Sep 2009 05:05:10 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 94268 invoked by uid 99); 17 Sep 2009 05:05:09 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Sep 2009 05:05:09 +0000 X-ASF-Spam-Status: No, hits=-1996.5 required=10.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Sep 2009 05:04:49 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 436A123888FD; Thu, 17 Sep 2009 05:04:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r816052 [2/5] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/con... Date: Thu, 17 Sep 2009 05:04:27 -0000 To: mapreduce-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090917050428.436A123888FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=816052&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Thu Sep 17 05:04:21 2009 @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.CounterGroup; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.jobhistory.EventReader.CounterFields; +import org.apache.hadoop.mapreduce.jobhistory.EventReader.GroupFields; +import org.codehaus.jackson.JsonEncoding; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerator; + +/** + * Event Writer is an utility class used to write events to the underlying + * stream. Typically, one event writer (which translates to one stream) + * is created per job + * + */ +class EventWriter { + + static final JsonFactory FACTORY = new JsonFactory(); + private JsonGenerator gen; + + EventWriter(FSDataOutputStream out) throws IOException { + gen = FACTORY.createJsonGenerator(out, JsonEncoding.UTF8); + // Prefix all log files with the version + writeVersionInfo(); + } + + private void writeVersionInfo() throws IOException { + gen.writeStartObject(); + gen.writeStringField("HISTORY_VERSION", JobHistory.HISTORY_VERSION); + gen.writeEndObject(); + gen.writeRaw("\n"); + } + + synchronized void write(HistoryEvent event) + throws IOException { + writeEventType(gen, event.getEventType()); + event.writeFields(gen); + gen.writeRaw("\n"); + } + + void flush() throws IOException { + gen.flush(); + } + + void close() throws IOException { + gen.close(); + } + + /** + * Write the event type to the JsonGenerator + * @param gen + * @param type + * @throws IOException + */ + private void writeEventType(JsonGenerator gen, EventType type) + throws IOException { + gen.writeStartObject(); + gen.writeStringField("EVENT_TYPE", type.toString()); + gen.writeEndObject(); + } + + static void writeCounters(Counters counters, JsonGenerator gen) + throws IOException { + gen.writeFieldName("COUNTERS"); + gen.writeStartArray(); // Start of all groups + Iterator groupItr = counters.iterator(); + while (groupItr.hasNext()) { + writeOneGroup(gen, groupItr.next()); + } + gen.writeEndArray(); // End of all groups + } + + static void writeOneGroup (JsonGenerator gen, CounterGroup grp) + throws IOException { + gen.writeStartObject(); // Start of this group + gen.writeStringField(GroupFields.ID.toString(), grp.getName()); + gen.writeStringField(GroupFields.NAME.toString(), grp.getDisplayName()); + + // Write out the List of counters + gen.writeFieldName(GroupFields.LIST.toString()); + gen.writeStartArray(); // Start array of counters + Iterator ctrItr = grp.iterator(); + while (ctrItr.hasNext()) { + writeOneCounter(gen, ctrItr.next()); + } + gen.writeEndArray(); // End of all counters + + gen.writeEndObject(); // End of this group + } + + static void writeOneCounter(JsonGenerator gen, Counter ctr) + throws IOException{ + gen.writeStartObject(); + gen.writeStringField(CounterFields.ID.toString(), ctr.getName()); + gen.writeStringField(CounterFields.NAME.toString(), ctr.getDisplayName()); + gen.writeNumberField("VALUE", ctr.getValue()); + gen.writeEndObject(); + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java?rev=816052&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java Thu Sep 17 05:04:21 2009 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import java.io.IOException; + +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonParser; + +/** + * The interface all job history events implement + * + */ +public interface HistoryEvent { + + // The category that history event belongs to + enum EventCategory { + JOB, TASK, TASK_ATTEMPT + } + + /** + * Serialize the Fields of the event to the JsonGenerator + * @param gen JsonGenerator to write to + * @throws IOException + */ + void writeFields (JsonGenerator gen) throws IOException; + + /** + * Deserialize the fields of the event from the JsonParser + * @param parser JsonParser to read from + * @throws IOException + */ + void readFields(JsonParser parser) throws IOException; + + /** Return the event type */ + EventType getEventType(); + + /** Retun the event category */ + EventCategory getEventCategory(); +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java?rev=816052&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java Thu Sep 17 05:04:21 2009 @@ -0,0 +1,743 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.jobhistory; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobInProgress; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapred.TaskLogServlet; +import org.apache.hadoop.mapred.TaskStatus; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.util.StringUtils; + +/** + * HistoryViewer is used to parse and view the JobHistory files + * + */ +public class HistoryViewer { + private static SimpleDateFormat dateFormat = + new SimpleDateFormat("d-MMM-yyyy HH:mm:ss"); + private FileSystem fs; + private JobInfo job; + private String jobId; + private boolean printAll; + +/** + * Constructs the HistoryViewer object + * @param historyFile The fully qualified Path of the History File + * @param conf The Configuration file + * @param printAll Toggle to print all status to only killed/failed status + * @throws IOException + */ + public HistoryViewer(String historyFile, + Configuration conf, + boolean printAll) throws IOException { + this.printAll = printAll; + String errorMsg = "Unable to initialize History Viewer"; + try { + Path jobFile = new Path(historyFile); + fs = jobFile.getFileSystem(conf); + String[] jobDetails = + jobFile.getName().split("_"); + if (jobDetails.length < 2) { + // NOT a valid name + System.err.println("Ignore unrecognized file: " + jobFile.getName()); + throw new IOException(errorMsg); + } + JobHistoryParser parser = new JobHistoryParser(fs, jobFile); + job = parser.parse(); + jobId = job.getJobId().toString(); + } catch(Exception e) { + throw new IOException(errorMsg, e); + } + } + + /** + * Print the job/task/attempt summary information + * @throws IOException + */ + public void print() throws IOException{ + printJobDetails(); + printTaskSummary(); + printJobAnalysis(); + printTasks(TaskType.JOB_SETUP, TaskStatus.State.FAILED.toString()); + printTasks(TaskType.JOB_SETUP, TaskStatus.State.KILLED.toString()); + printTasks(TaskType.MAP, TaskStatus.State.FAILED.toString()); + printTasks(TaskType.MAP, TaskStatus.State.KILLED.toString()); + printTasks(TaskType.REDUCE, TaskStatus.State.FAILED.toString()); + printTasks(TaskType.REDUCE, TaskStatus.State.KILLED.toString()); + printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.FAILED.toString()); + printTasks(TaskType.JOB_CLEANUP, + JobStatus.getJobRunState(JobStatus.KILLED)); + if (printAll) { + printTasks(TaskType.JOB_SETUP, TaskStatus.State.SUCCEEDED.toString()); + printTasks(TaskType.MAP, TaskStatus.State.SUCCEEDED.toString()); + printTasks(TaskType.REDUCE, TaskStatus.State.SUCCEEDED.toString()); + printTasks(TaskType.JOB_CLEANUP, TaskStatus.State.SUCCEEDED.toString()); + printAllTaskAttempts(TaskType.JOB_SETUP); + printAllTaskAttempts(TaskType.MAP); + printAllTaskAttempts(TaskType.REDUCE); + printAllTaskAttempts(TaskType.JOB_CLEANUP); + } + + FilteredJob filter = new FilteredJob(job, + TaskStatus.State.FAILED.toString()); + printFailedAttempts(filter); + + filter = new FilteredJob(job, + TaskStatus.State.KILLED.toString()); + printFailedAttempts(filter); + } + + private void printJobDetails() { + StringBuffer jobDetails = new StringBuffer(); + jobDetails.append("\nHadoop job: " ).append(job.getJobId()); + jobDetails.append("\n====================================="); + jobDetails.append("\nUser: ").append(job.getUsername()); + jobDetails.append("\nJobName: ").append(job.getJobname()); + jobDetails.append("\nJobConf: ").append(job.getJobConfPath()); + jobDetails.append("\nSubmitted At: ").append(StringUtils. + getFormattedTimeWithDiff(dateFormat, + job.getSubmitTime(), 0)); + jobDetails.append("\nLaunched At: ").append(StringUtils. + getFormattedTimeWithDiff(dateFormat, + job.getLaunchTime(), + job.getSubmitTime())); + jobDetails.append("\nFinished At: ").append(StringUtils. + getFormattedTimeWithDiff(dateFormat, + job.getFinishTime(), + job.getLaunchTime())); + jobDetails.append("\nStatus: ").append(((job.getJobStatus() == null) ? + "Incomplete" :job.getJobStatus())); + jobDetails.append("\n====================================="); + System.out.println(jobDetails.toString()); + } + + private void printAllTaskAttempts(TaskType taskType) { + Map tasks = job.getAllTasks(); + StringBuffer taskList = new StringBuffer(); + taskList.append("\n").append(taskType); + taskList.append(" task list for ").append(job.getJobId()); + taskList.append("\nTaskId\t\tStartTime"); + if (TaskType.REDUCE.equals(taskType)) { + taskList.append("\tShuffleFinished\tSortFinished"); + } + taskList.append("\tFinishTime\tHostName\tError\tTaskLogs"); + taskList.append("\n===================================================="); + System.out.println(taskList.toString()); + for (JobHistoryParser.TaskInfo task : tasks.values()) { + for (JobHistoryParser.TaskAttemptInfo attempt : + task.getAllTaskAttempts().values()) { + if (taskType.equals(task.getTaskType())){ + taskList.setLength(0); + taskList.append(attempt.getAttemptId()).append("\t"); + taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + attempt.getStartTime(), 0)).append("\t"); + if (TaskType.REDUCE.equals(taskType)) { + taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + attempt.getShuffleFinishTime(), + attempt.getStartTime())); + taskList.append("\t"); + taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + attempt.getSortFinishTime(), + attempt.getShuffleFinishTime())); + } + taskList.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + attempt.getFinishTime(), + attempt.getStartTime())); + taskList.append("\t"); + taskList.append(attempt.getHostname()).append("\t"); + taskList.append(attempt.getError()); + String taskLogsUrl = getTaskLogsUrl(attempt); + taskList.append(taskLogsUrl != null ? taskLogsUrl : "n/a"); + System.out.println(taskList.toString()); + } + } + } + } + + private void printTaskSummary() { + SummarizedJob ts = new SummarizedJob(job); + StringBuffer taskSummary = new StringBuffer(); + taskSummary.append("\nTask Summary"); + taskSummary.append("\n============================"); + taskSummary.append("\nKind\tTotal\t"); + taskSummary.append("Successful\tFailed\tKilled\tStartTime\tFinishTime"); + taskSummary.append("\n"); + taskSummary.append("\nSetup\t").append(ts.totalSetups); + taskSummary.append("\t").append(ts.numFinishedSetups); + taskSummary.append("\t\t").append(ts.numFailedSetups); + taskSummary.append("\t").append(ts.numKilledSetups); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.setupStarted, 0)); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.setupFinished, ts.setupStarted)); + taskSummary.append("\nMap\t").append(ts.totalMaps); + taskSummary.append("\t").append(job.getFinishedMaps()); + taskSummary.append("\t\t").append(ts.numFailedMaps); + taskSummary.append("\t").append(ts.numKilledMaps); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.mapStarted, 0)); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.mapFinished, ts.mapStarted)); + taskSummary.append("\nReduce\t").append(ts.totalReduces); + taskSummary.append("\t").append(job.getFinishedReduces()); + taskSummary.append("\t\t").append(ts.numFailedReduces); + taskSummary.append("\t").append(ts.numKilledReduces); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.reduceStarted, 0)); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.reduceFinished, ts.reduceStarted)); + taskSummary.append("\nCleanup\t").append(ts.totalCleanups); + taskSummary.append("\t").append(ts.numFinishedCleanups); + taskSummary.append("\t\t").append(ts.numFailedCleanups); + taskSummary.append("\t").append(ts.numKilledCleanups); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.cleanupStarted, 0)); + taskSummary.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, ts.cleanupFinished, + ts.cleanupStarted)); + taskSummary.append("\n============================\n"); + System.out.println(taskSummary.toString()); + } + + private void printJobAnalysis() { + if (!job.getJobStatus().equals + (JobStatus.getJobRunState(JobStatus.SUCCEEDED))) { + System.out.println("No Analysis available as job did not finish"); + return; + } + + AnalyzedJob avg = new AnalyzedJob(job); + + System.out.println("\nAnalysis"); + System.out.println("========="); + printAnalysis(avg.getMapTasks(), cMap, "map", avg.getAvgMapTime(), 10); + printLast(avg.getMapTasks(), "map", cFinishMapRed); + + if (avg.getReduceTasks().length > 0) { + printAnalysis(avg.getReduceTasks(), cShuffle, "shuffle", + avg.getAvgShuffleTime(), 10); + printLast(avg.getReduceTasks(), "shuffle", cFinishShuffle); + + printAnalysis(avg.getReduceTasks(), cReduce, "reduce", + avg.getAvgReduceTime(), 10); + printLast(avg.getReduceTasks(), "reduce", cFinishMapRed); + } + System.out.println("========="); + } + + private void printAnalysis(JobHistoryParser.TaskAttemptInfo [] tasks, + Comparator cmp, + String taskType, + long avg, + int showTasks) { + Arrays.sort(tasks, cmp); + JobHistoryParser.TaskAttemptInfo min = tasks[tasks.length-1]; + StringBuffer details = new StringBuffer(); + details.append("\nTime taken by best performing "); + details.append(taskType).append(" task "); + details.append(min.getAttemptId().getTaskID().toString()).append(": "); + if ("map".equals(taskType)) { + details.append(StringUtils.formatTimeDiff( + min.getFinishTime(), + min.getStartTime())); + } else if ("shuffle".equals(taskType)) { + details.append(StringUtils.formatTimeDiff( + min.getShuffleFinishTime(), + min.getStartTime())); + } else { + details.append(StringUtils.formatTimeDiff( + min.getFinishTime(), + min.getShuffleFinishTime())); + } + details.append("\nAverage time taken by "); + details.append(taskType).append(" tasks: "); + details.append(StringUtils.formatTimeDiff(avg, 0)); + details.append("\nWorse performing "); + details.append(taskType).append(" tasks: "); + details.append("\nTaskId\t\tTimetaken"); + System.out.println(details.toString()); + for (int i = 0; i < showTasks && i < tasks.length; i++) { + details.setLength(0); + details.append(tasks[i].getAttemptId().getTaskID()).append(" "); + if ("map".equals(taskType)) { + details.append(StringUtils.formatTimeDiff( + tasks[i].getFinishTime(), + tasks[i].getStartTime())); + } else if ("shuffle".equals(taskType)) { + details.append(StringUtils.formatTimeDiff( + tasks[i].getShuffleFinishTime(), + tasks[i].getStartTime())); + } else { + details.append(StringUtils.formatTimeDiff( + tasks[i].getFinishTime(), + tasks[i].getShuffleFinishTime())); + } + System.out.println(details.toString()); + } + } + + private void printLast(JobHistoryParser.TaskAttemptInfo [] tasks, + String taskType, + Comparator cmp + ) { + Arrays.sort(tasks, cFinishMapRed); + JobHistoryParser.TaskAttemptInfo last = tasks[0]; + StringBuffer lastBuf = new StringBuffer(); + lastBuf.append("The last ").append(taskType); + lastBuf.append(" task ").append(last.getAttemptId().getTaskID()); + Long finishTime; + if ("shuffle".equals(taskType)) { + finishTime = last.getShuffleFinishTime(); + } else { + finishTime = last.getFinishTime(); + } + lastBuf.append(" finished at (relative to the Job launch time): "); + lastBuf.append(StringUtils.getFormattedTimeWithDiff(dateFormat, + finishTime, job.getLaunchTime())); + System.out.println(lastBuf.toString()); + } + + private void printTasks(TaskType taskType, String status) { + Map tasks = job.getAllTasks(); + StringBuffer header = new StringBuffer(); + header.append("\n").append(status).append(" "); + header.append(taskType).append(" task list for ").append(jobId); + header.append("\nTaskId\t\tStartTime\tFinishTime\tError"); + if (TaskType.MAP.equals(taskType)) { + header.append("\tInputSplits"); + } + header.append("\n===================================================="); + StringBuffer taskList = new StringBuffer(); + for (JobHistoryParser.TaskInfo task : tasks.values()) { + if (taskType.equals(task.getTaskType()) && + (status.equals(task.getTaskStatus()) + || status.equalsIgnoreCase("ALL"))) { + taskList.setLength(0); + taskList.append(task.getTaskId()); + taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, task.getStartTime(), 0)); + taskList.append("\t").append(StringUtils.getFormattedTimeWithDiff( + dateFormat, task.getFinishTime(), + task.getStartTime())); + taskList.append("\t").append(task.getError()); + if (TaskType.MAP.equals(taskType)) { + taskList.append("\t").append(task.getSplitLocations()); + } + if (taskList != null) { + System.out.println(header.toString()); + System.out.println(taskList.toString()); + } + } + } + } + + private void printFailedAttempts(FilteredJob filteredJob) { + Map> badNodes = filteredJob.getFilteredMap(); + StringBuffer attempts = new StringBuffer(); + if (badNodes.size() > 0) { + attempts.append("\n").append(filteredJob.getFilter()); + attempts.append(" task attempts by nodes"); + attempts.append("\nHostname\tFailedTasks"); + attempts.append("\n==============================="); + System.out.println(attempts.toString()); + for (Map.Entry> entry : badNodes.entrySet()) { + String node = entry.getKey(); + Set failedTasks = entry.getValue(); + attempts.setLength(0); + attempts.append(node).append("\t"); + for (TaskID t : failedTasks) { + attempts.append(t).append(", "); + } + System.out.println(attempts.toString()); + } + } + } + + /** + * Return the TaskLogsUrl of a particular TaskAttempt + * + * @param attempt + * @return the taskLogsUrl. null if http-port or tracker-name or + * task-attempt-id are unavailable. + */ + public static String getTaskLogsUrl( + JobHistoryParser.TaskAttemptInfo attempt) { + if (attempt.getHttpPort() == -1 + || attempt.getTrackerName().equals("") + || attempt.getAttemptId() == null) { + return null; + } + + String taskTrackerName = + JobInProgress.convertTrackerNameToHostName( + attempt.getTrackerName()); + return TaskLogServlet.getTaskLogUrl(taskTrackerName, + Integer.toString(attempt.getHttpPort()), + attempt.getAttemptId().toString()); + } + + private Comparator cMap = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getFinishTime() - t1.getStartTime(); + long l2 = t2.getFinishTime() - t2.getStartTime(); + return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); + } + }; + + private Comparator cShuffle = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getShuffleFinishTime() - t1.getStartTime(); + long l2 = t2.getShuffleFinishTime() - t2.getStartTime(); + return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); + } + }; + + private Comparator cFinishShuffle = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getShuffleFinishTime(); + long l2 = t2.getShuffleFinishTime(); + return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); + } + }; + + private Comparator cFinishMapRed = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getFinishTime(); + long l2 = t2.getFinishTime(); + return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); + } + }; + + private Comparator cReduce = + new Comparator() { + public int compare(JobHistoryParser.TaskAttemptInfo t1, + JobHistoryParser.TaskAttemptInfo t2) { + long l1 = t1.getFinishTime() - + t1.getShuffleFinishTime(); + long l2 = t2.getFinishTime() - + t2.getShuffleFinishTime(); + return (l2 < l1 ? -1 : (l2 == l1 ? 0 : 1)); + } + }; + + /** + * Utility class used the summarize the job. + * Used by HistoryViewer and the JobHistory UI. + * + */ + public static class SummarizedJob { + Map tasks; + int totalMaps = 0; + int totalReduces = 0; + int totalCleanups = 0; + int totalSetups = 0; + int numFailedMaps = 0; + int numKilledMaps = 0; + int numFailedReduces = 0; + int numKilledReduces = 0; + int numFinishedCleanups = 0; + int numFailedCleanups = 0; + int numKilledCleanups = 0; + int numFinishedSetups = 0; + int numFailedSetups = 0; + int numKilledSetups = 0; + long mapStarted = 0; + long mapFinished = 0; + long reduceStarted = 0; + long reduceFinished = 0; + long cleanupStarted = 0; + long cleanupFinished = 0; + long setupStarted = 0; + long setupFinished = 0; + + /** Get total maps */ + public int getTotalMaps() { return totalMaps; } + /** Get total reduces */ + public int getTotalReduces() { return totalReduces; } + /** Get number of clean up tasks */ + public int getTotalCleanups() { return totalCleanups; } + /** Get number of set up tasks */ + public int getTotalSetups() { return totalSetups; } + /** Get number of failed maps */ + public int getNumFailedMaps() { return numFailedMaps; } + /** Get number of killed maps */ + public int getNumKilledMaps() { return numKilledMaps; } + /** Get number of failed reduces */ + public int getNumFailedReduces() { return numFailedReduces; } + /** Get number of killed reduces */ + public int getNumKilledReduces() { return numKilledReduces; } + /** Get number of cleanup tasks that finished */ + public int getNumFinishedCleanups() { return numFinishedCleanups; } + /** Get number of failed cleanup tasks */ + public int getNumFailedCleanups() { return numFailedCleanups; } + /** Get number of killed cleanup tasks */ + public int getNumKilledCleanups() { return numKilledCleanups; } + /** Get number of finished set up tasks */ + public int getNumFinishedSetups() { return numFinishedSetups; } + /** Get number of failed set up tasks */ + public int getNumFailedSetups() { return numFailedSetups; } + /** Get number of killed set up tasks */ + public int getNumKilledSetups() { return numKilledSetups; } + /** Get number of maps that were started */ + public long getMapStarted() { return mapStarted; } + /** Get number of maps that finished */ + public long getMapFinished() { return mapFinished; } + /** Get number of Reducers that were started */ + public long getReduceStarted() { return reduceStarted; } + /** Get number of reducers that finished */ + public long getReduceFinished() { return reduceFinished; } + /** Get number of cleanup tasks started */ + public long getCleanupStarted() { return cleanupStarted; } + /** Get number of cleanup tasks that finished */ + public long getCleanupFinished() { return cleanupFinished; } + /** Get number of setup tasks that started */ + public long getSetupStarted() { return setupStarted; } + /** Get number of setup tasks that finished */ + public long getSetupFinished() { return setupFinished; } + + /** Create summary information for the parsed job */ + public SummarizedJob(JobInfo job) { + tasks = job.getAllTasks(); + + for (JobHistoryParser.TaskInfo task : tasks.values()) { + Map attempts = + task.getAllTaskAttempts(); + //allHosts.put(task.getHo(Keys.HOSTNAME), ""); + for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) { + long startTime = attempt.getStartTime(); + long finishTime = attempt.getFinishTime(); + if (attempt.getTaskType().equals(TaskType.MAP)) { + if (mapStarted== 0 || mapStarted > startTime) { + mapStarted = startTime; + } + if (mapFinished < finishTime) { + mapFinished = finishTime; + } + totalMaps++; + if (attempt.getTaskStatus().equals + (TaskStatus.State.FAILED.toString())) { + numFailedMaps++; + } else if (attempt.getTaskStatus().equals + (TaskStatus.State.KILLED.toString())) { + numKilledMaps++; + } + } else if (attempt.getTaskType().equals(TaskType.REDUCE)) { + if (reduceStarted==0||reduceStarted > startTime) { + reduceStarted = startTime; + } + if (reduceFinished < finishTime) { + reduceFinished = finishTime; + } + totalReduces++; + if (attempt.getTaskStatus().equals + (TaskStatus.State.FAILED.toString())) { + numFailedReduces++; + } else if (attempt.getTaskStatus().equals + (TaskStatus.State.KILLED.toString())) { + numKilledReduces++; + } + } else if (attempt.getTaskType().equals(TaskType.JOB_CLEANUP)) { + if (cleanupStarted==0||cleanupStarted > startTime) { + cleanupStarted = startTime; + } + if (cleanupFinished < finishTime) { + cleanupFinished = finishTime; + } + totalCleanups++; + if (attempt.getTaskStatus().equals + (TaskStatus.State.SUCCEEDED.toString())) { + numFinishedCleanups++; + } else if (attempt.getTaskStatus().equals + (TaskStatus.State.FAILED.toString())) { + numFailedCleanups++; + } else if (attempt.getTaskStatus().equals + (TaskStatus.State.KILLED.toString())) { + numKilledCleanups++; + } + } else if (attempt.getTaskType().equals(TaskType.JOB_SETUP)) { + if (setupStarted==0||setupStarted > startTime) { + setupStarted = startTime; + } + if (setupFinished < finishTime) { + setupFinished = finishTime; + } + totalSetups++; + if (attempt.getTaskStatus().equals + (TaskStatus.State.SUCCEEDED.toString())) { + numFinishedSetups++; + } else if (attempt.getTaskStatus().equals + (TaskStatus.State.FAILED.toString())) { + numFailedSetups++; + } else if (attempt.getTaskStatus().equals + (TaskStatus.State.KILLED.toString())) { + numKilledSetups++; + } + } + } + } + } + } + + /** + * Utility class used while analyzing the job. + * Used by HistoryViewer and the JobHistory UI. + */ + + public static class AnalyzedJob { + private long avgMapTime; + private long avgReduceTime; + private long avgShuffleTime; + + private JobHistoryParser.TaskAttemptInfo [] mapTasks; + private JobHistoryParser.TaskAttemptInfo [] reduceTasks; + + /** Get the average map time */ + public long getAvgMapTime() { return avgMapTime; } + /** Get the average reduce time */ + public long getAvgReduceTime() { return avgReduceTime; } + /** Get the average shuffle time */ + public long getAvgShuffleTime() { return avgShuffleTime; } + /** Get the map tasks list */ + public JobHistoryParser.TaskAttemptInfo [] getMapTasks() { + return mapTasks; + } + /** Get the reduce tasks list */ + public JobHistoryParser.TaskAttemptInfo [] getReduceTasks() { + return reduceTasks; + } + /** Generate analysis information for the parsed job */ + public AnalyzedJob (JobInfo job) { + Map tasks = job.getAllTasks(); + int finishedMaps = (int) job.getFinishedMaps(); + int finishedReduces = (int) job.getFinishedReduces(); + mapTasks = + new JobHistoryParser.TaskAttemptInfo[finishedMaps]; + reduceTasks = + new JobHistoryParser.TaskAttemptInfo[finishedReduces]; + int mapIndex = 0 , reduceIndex=0; + avgMapTime = 0; + avgReduceTime = 0; + avgShuffleTime = 0; + + for (JobHistoryParser.TaskInfo task : tasks.values()) { + Map attempts = + task.getAllTaskAttempts(); + for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) { + if (attempt.getTaskStatus(). + equals(TaskStatus.State.SUCCEEDED.toString())) { + long avgFinishTime = (attempt.getFinishTime() - + attempt.getStartTime()); + if (attempt.getTaskType().equals(TaskType.MAP)) { + mapTasks[mapIndex++] = attempt; + avgMapTime += avgFinishTime; + } else if (attempt.getTaskType().equals(TaskType.REDUCE)) { + reduceTasks[reduceIndex++] = attempt; + avgShuffleTime += (attempt.getShuffleFinishTime() - + attempt.getStartTime()); + avgReduceTime += (attempt.getFinishTime() - + attempt.getShuffleFinishTime()); + } + break; + } + } + } + if (finishedMaps > 0) { + avgMapTime /= finishedMaps; + } + if (finishedReduces > 0) { + avgReduceTime /= finishedReduces; + avgShuffleTime /= finishedReduces; + } + } + } + + /** + * Utility to filter out events based on the task status + * + */ + public static class FilteredJob { + + private Map> badNodesToFilteredTasks = + new HashMap>(); + + private String filter; + + /** Get the map of the filtered tasks */ + public Map> getFilteredMap() { + return badNodesToFilteredTasks; + } + + /** Get the current filter */ + public String getFilter() { return filter; } + + /** Apply the filter (status) on the parsed job and generate summary */ + public FilteredJob(JobInfo job, String status) { + + filter = status; + + Map tasks = job.getAllTasks(); + + for (JobHistoryParser.TaskInfo task : tasks.values()) { + Map attempts = + task.getAllTaskAttempts(); + for (JobHistoryParser.TaskAttemptInfo attempt : attempts.values()) { + if (attempt.getTaskStatus().equals(status)) { + String hostname = attempt.getHostname(); + TaskID id = attempt.getAttemptId().getTaskID(); + + Set set = badNodesToFilteredTasks.get(hostname); + + if (set == null) { + set = new TreeSet(); + set.add(id); + badNodesToFilteredTasks.put(hostname, set); + }else{ + set.add(id); + } + } + } + } + } + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java?rev=816052&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java Thu Sep 17 05:04:21 2009 @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobID; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; + +/** + * Event to record successful completion of job + * + */ +public class JobFinishedEvent implements HistoryEvent { + + private EventCategory category; + private JobID jobid; + private long finishTime; + private int finishedMaps; + private int finishedReduces; + private int failedMaps; + private int failedReduces; + private Counters counters; + + enum EventFields { EVENT_CATEGORY, + JOB_ID, + FINISH_TIME, + FINISHED_MAPS, + FINISHED_REDUCES, + FAILED_MAPS, + FAILED_REDUCES, + COUNTERS } + + JobFinishedEvent() { + } + + /** + * Create an event to record successful job completion + * @param id Job ID + * @param finishTime Finish time of the job + * @param finishedMaps The number of finished maps + * @param finishedReduces The number of finished reduces + * @param failedMaps The number of failed maps + * @param failedReduces The number of failed reduces + * @param counters Counters for the job + */ + public JobFinishedEvent(JobID id, long finishTime, + int finishedMaps, int finishedReduces, + int failedMaps, int failedReduces, + Counters counters) { + this.jobid = id; + this.finishTime = finishTime; + this.finishedMaps = finishedMaps; + this.finishedReduces = finishedReduces; + this.failedMaps = failedMaps; + this.failedReduces = failedReduces; + this.counters = counters; + this.category = EventCategory.JOB; + } + + /** Get the Event Category */ + public EventCategory getEventCategory() { return category; } + /** Get the Job ID */ + public JobID getJobid() { return jobid; } + /** Get the job finish time */ + public long getFinishTime() { return finishTime; } + /** Get the number of finished maps for the job */ + public int getFinishedMaps() { return finishedMaps; } + /** Get the number of finished reducers for the job */ + public int getFinishedReduces() { return finishedReduces; } + /** Get the number of failed maps for the job */ + public int getFailedMaps() { return failedMaps; } + /** Get the number of failed reducers for the job */ + public int getFailedReduces() { return failedReduces; } + /** Get the counters for the job */ + public Counters getCounters() { return counters; } + /** Get the event type */ + public EventType getEventType() { + return EventType.JOB_FINISHED; + } + + public void readFields(JsonParser jp) throws IOException { + if (jp.nextToken() != JsonToken.START_OBJECT) { + throw new IOException("Unexpected token while reading"); + } + + while (jp.nextToken() != JsonToken.END_OBJECT) { + String fieldname = jp.getCurrentName(); + jp.nextToken(); // move to value + switch (Enum.valueOf(EventFields.class, fieldname)) { + case EVENT_CATEGORY: + category = Enum.valueOf(EventCategory.class, jp.getText()); + break; + case JOB_ID: + jobid = JobID.forName(jp.getText()); + break; + case FINISH_TIME: + finishTime = jp.getLongValue(); + break; + case FINISHED_MAPS: + finishedMaps = jp.getIntValue(); + break; + case FINISHED_REDUCES: + finishedReduces = jp.getIntValue(); + break; + case FAILED_MAPS: + failedMaps = jp.getIntValue(); + break; + case FAILED_REDUCES: + failedReduces = jp.getIntValue(); + break; + case COUNTERS: + counters = EventReader.readCounters(jp); + break; + default: + throw new IOException("Unrecognized field '"+fieldname+"'!"); + } + } + } + + public void writeFields(JsonGenerator gen) throws IOException { + gen.writeStartObject(); + gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), + category.toString()); + gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString()); + gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime); + gen.writeNumberField(EventFields.FINISHED_MAPS.toString(), finishedMaps); + gen.writeNumberField(EventFields.FINISHED_REDUCES.toString(), + finishedReduces); + gen.writeNumberField(EventFields.FAILED_MAPS.toString(), failedMaps); + gen.writeNumberField(EventFields.FAILED_REDUCES.toString(), + failedReduces); + EventWriter.writeCounters(counters, gen); + gen.writeEndObject(); + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java?rev=816052&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistory.java Thu Sep 17 05:04:21 2009 @@ -0,0 +1,519 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobTracker; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.util.StringUtils; + +/** + * JobHistory is the class that is responsible for creating and maintaining + * job history information. + * + */ +public class JobHistory { + + final Log LOG = LogFactory.getLog(JobHistory.class); + + private long jobHistoryBlockSize; + private Map fileMap; + private ThreadPoolExecutor executor = null; + static final FsPermission HISTORY_DIR_PERMISSION = + FsPermission.createImmutable((short) 0750); // rwxr-x--- + + public static final FsPermission HISTORY_FILE_PERMISSION = + FsPermission.createImmutable((short) 0740); // rwxr----- + + private JobTracker jobTracker; + + static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L; //week + private FileSystem logDirFs; // log Dir FS + private FileSystem doneDirFs; // done Dir FS + + private Path logDir = null; + private Path done = null; // folder for completed jobs + + public static final String OLD_SUFFIX = ".old"; + + // Version string that will prefix all History Files + public static final String HISTORY_VERSION = "1.0"; + + private HistoryCleaner historyCleanerThread = null; + + /** + * Initialize Job History Module + * @param jt Job Tracker handle + * @param conf Configuration + * @param hostname Host name of JT + * @param jobTrackerStartTime Start time of JT + * @throws IOException + */ + public void init(JobTracker jt, JobConf conf, String hostname, + long jobTrackerStartTime) throws IOException { + + // Get and create the log folder + String logDirLoc = conf.get("hadoop.job.history.location" , + "file:///" + + new File(System.getProperty("hadoop.log.dir")).getAbsolutePath() + + File.separator + "history"); + + LOG.info("History log directory is " + logDirLoc); + + logDir = new Path(logDirLoc); + logDirFs = logDir.getFileSystem(conf); + + if (!logDirFs.exists(logDir)){ + if (!logDirFs.mkdirs(logDir, + new FsPermission(HISTORY_DIR_PERMISSION))) { + throw new IOException("Mkdirs failed to create " + + logDir.toString()); + } + } + conf.set("hadoop.job.history.location", logDirLoc); + + jobHistoryBlockSize = + conf.getLong("mapred.jobtracker.job.history.block.size", + 3 * 1024 * 1024); + + jobTracker = jt; + + fileMap = new HashMap (); + } + + /** Initialize the done directory and start the history cleaner thread */ + public void initDone(JobConf conf, FileSystem fs) throws IOException { + //if completed job history location is set, use that + String doneLocation = + conf.get("mapred.job.tracker.history.completed.location"); + if (doneLocation != null) { + done = fs.makeQualified(new Path(doneLocation)); + doneDirFs = fs; + } else { + done = logDirFs.makeQualified(new Path(logDir, "done")); + doneDirFs = logDirFs; + } + + //If not already present create the done folder with appropriate + //permission + if (!doneDirFs.exists(done)) { + LOG.info("Creating DONE folder at "+ done); + if (! doneDirFs.mkdirs(done, + new FsPermission(HISTORY_DIR_PERMISSION))) { + throw new IOException("Mkdirs failed to create " + done.toString()); + } + } + LOG.info("Inited the done directory to " + done.toString()); + + moveOldFiles(); + startFileMoverThreads(); + + // Start the History Cleaner Thread + long maxAgeOfHistoryFiles = conf.getLong( + "mapreduce.cluster.jobhistory.maxage", DEFAULT_HISTORY_MAX_AGE); + historyCleanerThread = new HistoryCleaner(maxAgeOfHistoryFiles); + historyCleanerThread.start(); + } + + /** + * Move the completed job into the completed folder. + * This assumes that the job history file is closed and + * all operations on the job history file is complete. + * This *should* be the last call to job history for a given job. + */ + public void markCompleted(JobID id) throws IOException { + moveToDone(id); + } + + /** Shut down JobHistory after stopping the History cleaner */ + public void shutDown() { + LOG.info("Interrupting History Cleaner"); + historyCleanerThread.interrupt(); + try { + historyCleanerThread.join(); + } catch (InterruptedException e) { + LOG.info("Error with shutting down history thread"); + } + } + + /** Get the done directory */ + public synchronized String getDoneJobHistoryFileName(JobConf jobConf, + JobID id) throws IOException { + if (done == null) { + return null; + } + return getJobHistoryFileName(jobConf, id, done, doneDirFs); + } + + /** + * Get the history location + */ + public Path getJobHistoryLocation() { + return logDir; + } + + /** + * Get the history location for completed jobs + */ + public Path getCompletedJobHistoryLocation() { + return done; + } + + /** + * @param dir The directory where to search. + */ + private synchronized String getJobHistoryFileName(JobConf jobConf, + JobID id, Path dir, FileSystem fs) + throws IOException { + String user = getUserName(jobConf); + // Make the pattern matching the job's history file + final Pattern historyFilePattern = + Pattern.compile(id.toString() + "_" + user + "+"); + // a path filter that matches the parts of the filenames namely + // - job-id, user name + PathFilter filter = new PathFilter() { + public boolean accept(Path path) { + String fileName = path.getName(); + return historyFilePattern.matcher(fileName).find(); + } + }; + + FileStatus[] statuses = fs.listStatus(dir, filter); + String filename = null; + if (statuses.length == 0) { + LOG.info("Nothing to recover for job " + id); + } else { + filename = statuses[0].getPath().getName(); + LOG.info("Recovered job history filename for job " + id + " is " + + filename); + } + return filename; + } + + String getNewJobHistoryFileName(JobConf conf, JobID jobId) { + return jobId.toString() + + "_" + getUserName(conf); + } + + /** + * Get the job history file path given the history filename + */ + private Path getJobHistoryLogLocation(String logFileName) { + return logDir == null ? null : new Path(logDir, logFileName); + } + + /** + * Create an event writer for the Job represented by the jobID. + * This should be the first call to history for a job + * @param jobId + * @param jobConf + * @throws IOException + */ + public void setupEventWriter(JobID jobId, JobConf jobConf) + throws IOException { + String logFileName = getNewJobHistoryFileName(jobConf, jobId); + + Path logFile = getJobHistoryLogLocation(logFileName); + + if (logDir == null) { + LOG.info("Log Directory is null, returning"); + throw new IOException("Missing Log Directory for History"); + } + + int defaultBufferSize = + logDirFs.getConf().getInt("io.file.buffer.size", 4096); + + LOG.info("SetupWriter, creating file " + logFile); + + FSDataOutputStream out = logDirFs.create(logFile, + new FsPermission(JobHistory.HISTORY_FILE_PERMISSION), + EnumSet.of(CreateFlag.OVERWRITE), + defaultBufferSize, + logDirFs.getDefaultReplication(), + jobHistoryBlockSize, null); + + EventWriter writer = new EventWriter(out); + + /* Storing the job conf on the log dir */ + + Path logDirConfPath = getConfFile(jobId); + LOG.info("LogDirConfPath is " + logDirConfPath); + + FSDataOutputStream jobFileOut = null; + try { + if (logDirConfPath != null) { + defaultBufferSize = + logDirFs.getConf().getInt("io.file.buffer.size", 4096); + if (!logDirFs.exists(logDirConfPath)) { + jobFileOut = logDirFs.create(logDirConfPath, + new FsPermission(JobHistory.HISTORY_FILE_PERMISSION), + EnumSet.of(CreateFlag.OVERWRITE), + defaultBufferSize, + logDirFs.getDefaultReplication(), + logDirFs.getDefaultBlockSize(), null); + jobConf.writeXml(jobFileOut); + jobFileOut.close(); + } + } + } catch (IOException e) { + LOG.info("Failed to close the job configuration file " + + StringUtils.stringifyException(e)); + } + + MetaInfo fi = new MetaInfo(logFile, logDirConfPath, writer); + fileMap.put(jobId, fi); + } + + /** Close the event writer for this id */ + public void closeWriter(JobID id) { + try { + EventWriter writer = getWriter(id); + writer.close(); + } catch (IOException e) { + LOG.info("Error closing writer for JobID: " + id); + } + } + + + /** + * Get the JsonEventWriter for the specified Job Id + * @param jobId + * @return + * @throws IOException if a writer is not available + */ + private EventWriter getWriter(final JobID jobId) throws IOException { + EventWriter writer = null; + MetaInfo mi = fileMap.get(jobId); + if (mi == null || (writer = mi.getEventWriter()) == null) { + throw new IOException("History File does not exist for JobID"); + } + return writer; + } + + /** + * Method to log the specified event + * @param event The event to log + * @param id The Job ID of the event + */ + public void logEvent(HistoryEvent event, JobID id) { + try { + EventWriter writer = getWriter(id); + writer.write(event); + } catch (IOException e) { + LOG.error("Error creating writer, " + e.getMessage()); + } + } + + + private void moveToDoneNow(Path fromPath, Path toPath) throws IOException { + //check if path exists, in case of retries it may not exist + if (logDirFs.exists(fromPath)) { + LOG.info("Moving " + fromPath.toString() + " to " + + toPath.toString()); + doneDirFs.moveFromLocalFile(fromPath, toPath); + doneDirFs.setPermission(toPath, + new FsPermission(JobHistory.HISTORY_FILE_PERMISSION)); + } + } + + private void startFileMoverThreads() { + executor = new ThreadPoolExecutor(1, 3, 1, + TimeUnit.HOURS, new LinkedBlockingQueue()); + } + + Path getConfFile(JobID jobId) { + Path jobFilePath = null; + if (logDir != null) { + jobFilePath = new Path(logDir + File.separator + + jobId.toString() + "_conf.xml"); + } + return jobFilePath; + } + + private void moveOldFiles() throws IOException { + //move the log files remaining from last run to the DONE folder + //suffix the file name based on Jobtracker identifier so that history + //files with same job id don't get over written in case of recovery. + FileStatus[] files = logDirFs.listStatus(logDir); + String jtIdentifier = jobTracker.getTrackerIdentifier(); + String fileSuffix = "." + jtIdentifier + JobHistory.OLD_SUFFIX; + for (FileStatus fileStatus : files) { + Path fromPath = fileStatus.getPath(); + if (fromPath.equals(done)) { //DONE can be a subfolder of log dir + continue; + } + LOG.info("Moving log file from last run: " + fromPath); + Path toPath = new Path(done, fromPath.getName() + fileSuffix); + moveToDoneNow(fromPath, toPath); + } + } + + private void moveToDone(final JobID id) { + final List paths = new ArrayList(); + MetaInfo metaInfo = fileMap.get(id); + if (metaInfo == null) { + LOG.info("No file for job-history with " + id + " found in cache!"); + return; + } + + final Path historyFile = metaInfo.getHistoryFile(); + if (historyFile == null) { + LOG.info("No file for job-history with " + id + " found in cache!"); + } else { + paths.add(historyFile); + } + + final Path confPath = metaInfo.getConfFile(); + if (confPath == null) { + LOG.info("No file for jobconf with " + id + " found in cache!"); + } else { + paths.add(confPath); + } + + executor.execute(new Runnable() { + + public void run() { + //move the files to DONE folder + try { + for (Path path : paths) { + moveToDoneNow(path, new Path(done, path.getName())); + } + } catch (Throwable e) { + LOG.error("Unable to move history file to DONE folder.", e); + } + String historyFileDonePath = null; + if (historyFile != null) { + historyFileDonePath = new Path(done, + historyFile.getName()).toString(); + } + + jobTracker.retireJob(org.apache.hadoop.mapred.JobID.downgrade(id), + historyFileDonePath); + + //purge the job from the cache + fileMap.remove(id); + } + + }); + } + + private String getUserName(JobConf jobConf) { + String user = jobConf.getUser(); + if (user == null) { + user = ""; + } + return user; + } + + private static class MetaInfo { + private Path historyFile; + private Path confFile; + private EventWriter writer; + + MetaInfo(Path historyFile, Path conf, EventWriter writer) { + this.historyFile = historyFile; + this.confFile = conf; + this.writer = writer; + } + + Path getHistoryFile() { return historyFile; } + Path getConfFile() { return confFile; } + EventWriter getEventWriter() { return writer; } + } + + /** + * Delete history files older than a specified time duration. + */ + class HistoryCleaner extends Thread { + static final long ONE_DAY_IN_MS = 24 * 60 * 60 * 1000L; + private long cleanupFrequency; + private long maxAgeOfHistoryFiles; + + public HistoryCleaner(long maxAge) { + setName("Thread for cleaning up History files"); + setDaemon(true); + this.maxAgeOfHistoryFiles = maxAge; + cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles); + LOG.info("Job History Cleaner Thread started." + + " MaxAge is " + + maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," + + " Cleanup Frequency is " + + + cleanupFrequency + " ms (" + + ((float)cleanupFrequency)/ONE_DAY_IN_MS + " days)"); + } + + @Override + public void run(){ + + while (true) { + try { + doCleanup(); + Thread.sleep(cleanupFrequency); + } + catch (InterruptedException e) { + LOG.info("History Cleaner thread exiting"); + return; + } + catch (Throwable t) { + LOG.warn("History cleaner thread threw an exception", t); + } + } + } + + private void doCleanup() { + long now = System.currentTimeMillis(); + try { + FileStatus[] historyFiles = doneDirFs.listStatus(done); + if (historyFiles != null) { + for (FileStatus f : historyFiles) { + if (now - f.getModificationTime() > maxAgeOfHistoryFiles) { + doneDirFs.delete(f.getPath(), true); + LOG.info("Deleting old history file : " + f.getPath()); + } + } + } + } catch (IOException ie) { + LOG.info("Error cleaning up history directory" + + StringUtils.stringifyException(ie)); + } + } + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=816052&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Thu Sep 17 05:04:21 2009 @@ -0,0 +1,526 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapred.JobPriority; +import org.apache.hadoop.mapred.JobStatus; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapred.TaskStatus; +import org.apache.hadoop.mapreduce.TaskType; + +/** + * Default Parser for the JobHistory files. Typical usage is + * JobHistoryParser parser = new JobHistoryParser(fs, historyFile); + * job = parser.parse(); + * + */ +public class JobHistoryParser { + + private final FSDataInputStream in; + JobInfo info = null; + + /** + * Create a job history parser for the given history file using the + * given file system + * @param fs + * @param file + * @throws IOException + */ + public JobHistoryParser(FileSystem fs, String file) throws IOException { + this(fs, new Path(file)); + } + + /** + * Create the job history parser for the given history file using the + * given file system + * @param fs + * @param historyFile + * @throws IOException + */ + public JobHistoryParser(FileSystem fs, Path historyFile) + throws IOException { + in = fs.open(historyFile); + } + + /** + * Create the history parser based on the input stream + * @param in + */ + public JobHistoryParser(FSDataInputStream in) { + this.in = in; + } + + /** + * Parse the entire history file and populate the JobInfo object + * The first invocation will populate the object, subsequent calls + * will return the already parsed object. + * The input stream is closed on return + * @return The populated jobInfo object + * @throws IOException + */ + public synchronized JobInfo parse() throws IOException { + + if (info != null) { + return info; + } + + EventReader reader = new EventReader(in); + + HistoryEvent event; + info = new JobInfo(); + try { + while ((event = reader.getNextEvent()) != null) { + handleEvent(event); + } + } finally { + in.close(); + } + return info; + } + + private void handleEvent(HistoryEvent event) throws IOException { + EventType type = event.getEventType(); + + switch (type) { + case JOB_SUBMITTED: + handleJobSubmittedEvent((JobSubmittedEvent)event); + break; + case JOB_STATUS_CHANGED: + break; + case JOB_INFO_CHANGED: + handleJobInfoChangeEvent((JobInfoChangeEvent) event); + break; + case JOB_INITED: + handleJobInitedEvent((JobInitedEvent) event); + break; + case JOB_PRIORITY_CHANGED: + handleJobPriorityChangeEvent((JobPriorityChangeEvent) event); + break; + case JOB_FAILED: + case JOB_KILLED: + handleJobFailedEvent((JobUnsuccessfulCompletionEvent) event); + break; + case JOB_FINISHED: + handleJobFinishedEvent((JobFinishedEvent)event); + break; + case TASK_STARTED: + handleTaskStartedEvent((TaskStartedEvent) event); + break; + case TASK_FAILED: + handleTaskFailedEvent((TaskFailedEvent) event); + break; + case TASK_UPDATED: + handleTaskUpdatedEvent((TaskUpdatedEvent) event); + break; + case TASK_FINISHED: + handleTaskFinishedEvent((TaskFinishedEvent) event); + break; + case MAP_ATTEMPT_STARTED: + case CLEANUP_ATTEMPT_STARTED: + case REDUCE_ATTEMPT_STARTED: + case SETUP_ATTEMPT_STARTED: + handleTaskAttemptStartedEvent((TaskAttemptStartedEvent) event); + break; + case MAP_ATTEMPT_FAILED: + case CLEANUP_ATTEMPT_FAILED: + case REDUCE_ATTEMPT_FAILED: + case SETUP_ATTEMPT_FAILED: + case MAP_ATTEMPT_KILLED: + case CLEANUP_ATTEMPT_KILLED: + case REDUCE_ATTEMPT_KILLED: + case SETUP_ATTEMPT_KILLED: + handleTaskAttemptFailedEvent( + (TaskAttemptUnsuccessfulCompletionEvent) event); + break; + case MAP_ATTEMPT_FINISHED: + handleMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); + break; + case REDUCE_ATTEMPT_FINISHED: + handleReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); + break; + case SETUP_ATTEMPT_FINISHED: + case CLEANUP_ATTEMPT_FINISHED: + handleTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); + break; + default: + break; + } + } + + private void handleTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { + TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); + TaskAttemptInfo attemptInfo = + taskInfo.attemptsMap.get(event.getAttemptId()); + attemptInfo.finishTime = event.getFinishTime(); + attemptInfo.status = event.getTaskStatus(); + attemptInfo.state = event.getState(); + attemptInfo.counters = event.getCounters(); + attemptInfo.hostname = event.getHostname(); + } + + private void handleReduceAttemptFinishedEvent + (ReduceAttemptFinishedEvent event) { + TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); + TaskAttemptInfo attemptInfo = + taskInfo.attemptsMap.get(event.getAttemptId()); + attemptInfo.finishTime = event.getFinishTime(); + attemptInfo.status = event.getTaskStatus(); + attemptInfo.state = event.getState(); + attemptInfo.shuffleFinishTime = event.getShuffleFinishTime(); + attemptInfo.sortFinishTime = event.getSortFinishTime(); + attemptInfo.counters = event.getCounters(); + attemptInfo.hostname = event.getHostname(); + } + + private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { + TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); + TaskAttemptInfo attemptInfo = + taskInfo.attemptsMap.get(event.getAttemptId()); + attemptInfo.finishTime = event.getFinishTime(); + attemptInfo.status = event.getTaskStatus(); + attemptInfo.state = event.getState(); + attemptInfo.mapFinishTime = event.getMapFinishTime(); + attemptInfo.counters = event.getCounters(); + attemptInfo.hostname = event.getHostname(); + } + + private void handleTaskAttemptFailedEvent( + TaskAttemptUnsuccessfulCompletionEvent event) { + TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); + TaskAttemptInfo attemptInfo = + taskInfo.attemptsMap.get(event.getTaskAttemptId()); + attemptInfo.finishTime = event.getFinishTime(); + attemptInfo.error = event.getError(); + attemptInfo.status = event.getTaskStatus(); + attemptInfo.hostname = event.getHostname(); + } + + private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { + TaskAttemptID attemptId = event.getTaskAttemptId(); + TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); + + TaskAttemptInfo attemptInfo = new TaskAttemptInfo(); + attemptInfo.startTime = event.getStartTime(); + attemptInfo.attemptId = event.getTaskAttemptId(); + attemptInfo.httpPort = event.getHttpPort(); + attemptInfo.trackerName = event.getTrackerName(); + attemptInfo.taskType = event.getTaskType(); + + taskInfo.attemptsMap.put(attemptId, attemptInfo); + } + + private void handleTaskFinishedEvent(TaskFinishedEvent event) { + TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); + taskInfo.counters = event.getCounters(); + taskInfo.finishTime = event.getFinishTime(); + taskInfo.status = TaskStatus.State.SUCCEEDED.toString(); + } + + private void handleTaskUpdatedEvent(TaskUpdatedEvent event) { + TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); + taskInfo.finishTime = event.getFinishTime(); + } + + private void handleTaskFailedEvent(TaskFailedEvent event) { + TaskInfo taskInfo = info.tasksMap.get(event.getTaskId()); + taskInfo.status = TaskStatus.State.FAILED.toString(); + taskInfo.finishTime = event.getFinishTime(); + taskInfo.error = event.getError(); + taskInfo.failedDueToAttemptId = event.getFailedAttemptID(); + } + + private void handleTaskStartedEvent(TaskStartedEvent event) { + TaskInfo taskInfo = new TaskInfo(); + taskInfo.taskId = event.getTaskId(); + taskInfo.startTime = event.getStartTime(); + taskInfo.taskType = event.getTaskType(); + taskInfo.splitLocations = event.getSplitLocations(); + info.tasksMap.put(event.getTaskId(), taskInfo); + } + + private void handleJobFailedEvent(JobUnsuccessfulCompletionEvent event) { + info.finishTime = event.getFinishTime(); + info.finishedMaps = event.getFinishedMaps(); + info.finishedReduces = event.getFinishedReduces(); + info.jobStatus = event.getStatus(); + } + + private void handleJobFinishedEvent(JobFinishedEvent event) { + info.finishTime = event.getFinishTime(); + info.finishedMaps = event.getFinishedMaps(); + info.finishedReduces = event.getFinishedReduces(); + info.failedMaps = event.getFailedMaps(); + info.failedReduces = event.getFailedReduces(); + info.counters = event.getCounters(); + info.jobStatus = JobStatus.getJobRunState(JobStatus.SUCCEEDED); + } + + private void handleJobPriorityChangeEvent(JobPriorityChangeEvent event) { + info.priority = event.getPriority(); + } + + private void handleJobInitedEvent(JobInitedEvent event) { + info.launchTime = event.getLaunchTime(); + info.totalMaps = event.getTotalMaps(); + info.totalReduces = event.getTotalReduces(); + } + + private void handleJobInfoChangeEvent(JobInfoChangeEvent event) { + info.submitTime = event.getSubmitTime(); + info.launchTime = event.getLaunchTime(); + } + + private void handleJobSubmittedEvent(JobSubmittedEvent event) { + info.jobid = event.getJobId(); + info.jobname = event.getJobName(); + info.username = event.getUserName(); + info.submitTime = event.getSubmitTime(); + info.jobConfPath = event.getJobConfPath(); + } + + /** + * The class where job information is aggregated into after parsing + */ + public static class JobInfo { + long submitTime; + long finishTime; + JobID jobid; + String username; + String jobname; + String jobConfPath; + long launchTime; + int totalMaps; + int totalReduces; + int failedMaps; + int failedReduces; + int finishedMaps; + int finishedReduces; + String jobStatus; + Counters counters; + JobPriority priority; + + Map tasksMap; + + /** Create a job info object where job information will be stored + * after a parse + */ + public JobInfo() { + submitTime = launchTime = finishTime = -1; + totalMaps = totalReduces = failedMaps = failedReduces = 0; + finishedMaps = finishedReduces = 0; + username = jobname = jobConfPath = ""; + tasksMap = new HashMap(); + } + + /** Print all the job information */ + public void printAll() { + System.out.println("JOBNAME: " + jobname); + System.out.println("USERNAME: " + username); + System.out.println("SUBMIT_TIME" + submitTime); + System.out.println("LAUNCH_TIME: " + launchTime); + System.out.println("JOB_STATUS: " + jobStatus); + System.out.println("PRIORITY: " + priority); + System.out.println("TOTAL_MAPS: " + totalMaps); + System.out.println("TOTAL_REDUCES: " + totalReduces); + System.out.println("COUNTERS: " + counters.toString()); + + for (TaskInfo ti: tasksMap.values()) { + ti.printAll(); + } + } + + /** Get the job submit time */ + public long getSubmitTime() { return submitTime; } + /** Get the job finish time */ + public long getFinishTime() { return finishTime; } + /** Get the job id */ + public JobID getJobId() { return jobid; } + /** Get the user name */ + public String getUsername() { return username; } + /** Get the job name */ + public String getJobname() { return jobname; } + /** Get the path for the job configuration file */ + public String getJobConfPath() { return jobConfPath; } + /** Get the job launch time */ + public long getLaunchTime() { return launchTime; } + /** Get the total number of maps */ + public long getTotalMaps() { return totalMaps; } + /** Get the total number of reduces */ + public long getTotalReduces() { return totalReduces; } + /** Get the total number of failed maps */ + public long getFailedMaps() { return failedMaps; } + /** Get the number of failed reduces */ + public long getFailedReduces() { return failedReduces; } + /** Get the number of finished maps */ + public long getFinishedMaps() { return finishedMaps; } + /** Get the number of finished reduces */ + public long getFinishedReduces() { return finishedReduces; } + /** Get the job status */ + public String getJobStatus() { return jobStatus; } + /** Get the counters for the job */ + public Counters getCounters() { return counters; } + /** Get the map of all tasks in this job */ + public Map getAllTasks() { return tasksMap; } + /** Get the priority of this job */ + public String getPriority() { return priority.toString(); } + } + + /** + * TaskInformation is aggregated in this class after parsing + */ + public static class TaskInfo { + TaskID taskId; + long startTime; + long finishTime; + TaskType taskType; + String splitLocations; + Counters counters; + String status; + String error; + TaskAttemptID failedDueToAttemptId; + Map attemptsMap; + + public TaskInfo() { + startTime = finishTime = -1; + error = splitLocations = ""; + attemptsMap = new HashMap(); + } + + public void printAll() { + System.out.println("TASK_ID:" + taskId.toString()); + System.out.println("START_TIME: " + startTime); + System.out.println("FINISH_TIME:" + finishTime); + System.out.println("TASK_TYPE:" + taskType); + System.out.println("COUNTERS:" + counters.toString()); + + for (TaskAttemptInfo tinfo: attemptsMap.values()) { + tinfo.printAll(); + } + } + + /** Get the Task ID */ + public TaskID getTaskId() { return taskId; } + /** Get the start time of this task */ + public long getStartTime() { return startTime; } + /** Get the finish time of this task */ + public long getFinishTime() { return finishTime; } + /** Get the task type */ + public TaskType getTaskType() { return taskType; } + /** Get the split locations */ + public String getSplitLocations() { return splitLocations; } + /** Get the counters for this task */ + public Counters getCounters() { return counters; } + /** Get the task status */ + public String getTaskStatus() { return status; } + /** Get the attempt Id that caused this task to fail */ + public TaskAttemptID getFailedDueToAttemptId() { + return failedDueToAttemptId; + } + /** Get the error */ + public String getError() { return error; } + /** Get the map of all attempts for this task */ + public Map getAllTaskAttempts() { + return attemptsMap; + } + } + + /** + * Task Attempt Information is aggregated in this class after parsing + */ + public static class TaskAttemptInfo { + TaskAttemptID attemptId; + long startTime; + long finishTime; + long shuffleFinishTime; + long sortFinishTime; + long mapFinishTime; + String error; + String status; + String state; + TaskType taskType; + String trackerName; + Counters counters; + int httpPort; + String hostname; + + /** Create a Task Attempt Info which will store attempt level information + * on a history parse. + */ + public TaskAttemptInfo() { + startTime = finishTime = shuffleFinishTime = sortFinishTime = + mapFinishTime = -1; + error = state = trackerName = hostname = ""; + httpPort = -1; + } + /** + * Print all the information about this attempt. + */ + public void printAll() { + System.out.println("ATTEMPT_ID:" + attemptId.toString()); + System.out.println("START_TIME: " + startTime); + System.out.println("FINISH_TIME:" + finishTime); + System.out.println("ERROR:" + error); + System.out.println("TASK_STATUS:" + status); + System.out.println("STATE:" + state); + System.out.println("TASK_TYPE:" + taskType); + System.out.println("TRACKER_NAME:" + trackerName); + System.out.println("HTTP_PORT:" + httpPort); + System.out.println("COUNTERS:" + counters.toString()); + } + + /** Get the attempt Id */ + public TaskAttemptID getAttemptId() { return attemptId; } + /** Get the start time of the attempt */ + public long getStartTime() { return startTime; } + /** Get the finish time of the attempt */ + public long getFinishTime() { return finishTime; } + /** Get the shuffle finish time. Applicable only for reduce attempts */ + public long getShuffleFinishTime() { return shuffleFinishTime; } + /** Get the sort finish time. Applicable only for reduce attempts */ + public long getSortFinishTime() { return sortFinishTime; } + /** Get the map finish time. Applicable only for map attempts */ + public long getMapFinishTime() { return mapFinishTime; } + /** Get the error string */ + public String getError() { return error; } + /** Get the state */ + public String getState() { return state; } + /** Get the task status */ + public String getTaskStatus() { return status; } + /** Get the task type */ + public TaskType getTaskType() { return taskType; } + /** Get the tracker name where the attempt executed */ + public String getTrackerName() { return trackerName; } + /** Get the host name */ + public String getHostname() { return hostname; } + /** Get the counters for the attempt */ + public Counters getCounters() { return counters; } + /** Get the HTTP port for the tracker */ + public int getHttpPort() { return httpPort; } + } +} Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java?rev=816052&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java (added) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java Thu Sep 17 05:04:21 2009 @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.JobID; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; + +/** + * Event to record changes in the submit and launch time of + * a job + */ +public class JobInfoChangeEvent implements HistoryEvent { + + private EventCategory category; + private JobID jobid; + private long submitTime; + private long launchTime; + + enum EventFields { EVENT_CATEGORY, + JOB_ID, + SUBMIT_TIME, + LAUNCH_TIME } + + /** + * Create a event to record the submit and launch time of a job + * @param id Job Id + * @param submitTime Submit time of the job + * @param launchTime Launch time of the job + */ + public JobInfoChangeEvent(JobID id, long submitTime, long launchTime) { + this.jobid = id; + this.submitTime = submitTime; + this.launchTime = launchTime; + this.category = EventCategory.JOB; + } + + JobInfoChangeEvent() { } + + /** Get the Job ID */ + public JobID getJobId() { return jobid; } + /** Get the Job submit time */ + public long getSubmitTime() { return submitTime; } + /** Get the Job launch time */ + public long getLaunchTime() { return launchTime; } + /** Get the event category */ + public EventCategory getEventCategory() { return category; } + /** Get the event type */ + public EventType getEventType() { + return EventType.JOB_INFO_CHANGED; + } + + public void readFields(JsonParser jp) throws IOException { + if (jp.nextToken() != JsonToken.START_OBJECT) { + throw new IOException("Unexpected Token while reading"); + } + + while (jp.nextToken() != JsonToken.END_OBJECT) { + String fieldName = jp.getCurrentName(); + jp.nextToken(); // Move to the value + switch (Enum.valueOf(EventFields.class, fieldName)) { + case EVENT_CATEGORY: + category = Enum.valueOf(EventCategory.class, jp.getText()); + break; + case JOB_ID: + jobid = JobID.forName(jp.getText()); + break; + case SUBMIT_TIME: + submitTime = jp.getLongValue(); + break; + case LAUNCH_TIME: + launchTime = jp.getLongValue(); + break; + default: + throw new IOException("Unrecognized field '"+fieldName+"'!"); + } + } + } + + public void writeFields(JsonGenerator gen) throws IOException { + gen.writeStartObject(); + gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), + category.toString()); + gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString()); + gen.writeNumberField(EventFields.SUBMIT_TIME.toString(), submitTime); + gen.writeNumberField(EventFields.LAUNCH_TIME.toString(), launchTime); + gen.writeEndObject(); + } +}