Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 57705 invoked from network); 18 Feb 2010 18:44:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Feb 2010 18:44:04 -0000 Received: (qmail 20791 invoked by uid 500); 18 Feb 2010 18:44:04 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 20757 invoked by uid 500); 18 Feb 2010 18:44:04 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 20747 invoked by uid 99); 18 Feb 2010 18:44:04 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2010 18:44:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2010 18:43:52 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id BAE0823889B9; Thu, 18 Feb 2010 18:43:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r911519 [2/3] - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapreduce/jobhistory/ src/test/mapred/org/apache/hadoop/tools/rumen/ src/test/tools/data/rumen/small-trace-test/ src/tools/org/apache/hadoop/tools/rumen/ Date: Thu, 18 Feb 2010 18:43:30 -0000 To: mapreduce-commits@hadoop.apache.org From: cdouglas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100218184330.BAE0823889B9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobBuilder.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,686 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.StringTokenizer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; +import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinished; +import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent; +import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinished; +import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinished; +import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskFinished; +import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent; +import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values; +import org.apache.hadoop.util.StringUtils; + +/** + * {@link JobBuilder} builds one job. It processes a sequence of + * {@link HistoryEvent}s. + */ +public class JobBuilder { + private static final long BYTES_IN_MEG = + StringUtils.TraditionalBinaryPrefix.string2long("1m"); + + private String jobID; + + private boolean finalized = false; + + private LoggedJob result = new LoggedJob(); + + private Map mapTasks = new HashMap(); + private Map reduceTasks = + new HashMap(); + private Map otherTasks = + new HashMap(); + + private Map attempts = + new HashMap(); + + private Map allHosts = + new HashMap(); + + /** + * The number of splits a task can have, before we ignore them all. + */ + private final static int MAXIMUM_PREFERRED_LOCATIONS = 25; + /** + * The regular expression used to parse task attempt IDs in job tracker logs + */ + private final static Pattern taskAttemptIDPattern = + Pattern.compile(".*_([0-9]+)"); + + private int[] attemptTimesPercentiles = null; + + // Use this to search within the java options to get heap sizes. + // The heap size number is in Capturing Group 1. + // The heap size order-of-magnitude suffix is in Capturing Group 2 + private static final Pattern heapPattern = + Pattern.compile("-Xmx([0-9]+[kKmMgGtT])"); + + public JobBuilder(String jobID) { + this.jobID = jobID; + } + + public String getJobID() { + return jobID; + } + + { + if (attemptTimesPercentiles == null) { + attemptTimesPercentiles = new int[19]; + + for (int i = 0; i < 19; ++i) { + attemptTimesPercentiles[i] = (i + 1) * 5; + } + } + } + + /** + * Process one {@link HistoryEvent} + * + * @param event + * The {@link HistoryEvent} to be processed. + */ + public void process(HistoryEvent event) { + if (finalized) { + throw new IllegalStateException( + "JobBuilder.process(HistoryEvent event) called after LoggedJob built"); + } + + // these are in lexicographical order by class name. + if (event instanceof JobFinishedEvent) { + processJobFinishedEvent((JobFinishedEvent) event); + } else if (event instanceof JobInfoChangeEvent) { + processJobInfoChangeEvent((JobInfoChangeEvent) event); + } else if (event instanceof JobInitedEvent) { + processJobInitedEvent((JobInitedEvent) event); + } else if (event instanceof JobPriorityChangeEvent) { + processJobPriorityChangeEvent((JobPriorityChangeEvent) event); + } else if (event instanceof JobStatusChangedEvent) { + processJobStatusChangedEvent((JobStatusChangedEvent) event); + } else if (event instanceof JobSubmittedEvent) { + processJobSubmittedEvent((JobSubmittedEvent) event); + } else if (event instanceof JobUnsuccessfulCompletionEvent) { + processJobUnsuccessfulCompletionEvent((JobUnsuccessfulCompletionEvent) event); + } else if (event instanceof MapAttemptFinishedEvent) { + processMapAttemptFinishedEvent((MapAttemptFinishedEvent) event); + } else if (event instanceof ReduceAttemptFinishedEvent) { + processReduceAttemptFinishedEvent((ReduceAttemptFinishedEvent) event); + } else if (event instanceof TaskAttemptFinishedEvent) { + processTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event); + } else if (event instanceof TaskAttemptStartedEvent) { + processTaskAttemptStartedEvent((TaskAttemptStartedEvent) event); + } else if (event instanceof TaskAttemptUnsuccessfulCompletionEvent) { + processTaskAttemptUnsuccessfulCompletionEvent((TaskAttemptUnsuccessfulCompletionEvent) event); + } else if (event instanceof TaskFailedEvent) { + processTaskFailedEvent((TaskFailedEvent) event); + } else if (event instanceof TaskFinishedEvent) { + processTaskFinishedEvent((TaskFinishedEvent) event); + } else if (event instanceof TaskStartedEvent) { + processTaskStartedEvent((TaskStartedEvent) event); + } else if (event instanceof TaskUpdatedEvent) { + processTaskUpdatedEvent((TaskUpdatedEvent) event); + } else + throw new IllegalArgumentException( + "JobBuilder.process(HistoryEvent): unknown event type"); + } + + private String extract(Properties conf, String[] names, String defaultValue) { + for (String name : names) { + String result = conf.getProperty(name); + + if (result != null) { + return result; + } + } + + return defaultValue; + } + + private Integer extractMegabytes(Properties conf, String[] names) { + String javaOptions = extract(conf, names, null); + + if (javaOptions == null) { + return null; + } + + Matcher matcher = heapPattern.matcher(javaOptions); + + Integer heapMegabytes = null; + + while (matcher.find()) { + String heapSize = matcher.group(1); + heapMegabytes = + ((int) (StringUtils.TraditionalBinaryPrefix.string2long(heapSize) / BYTES_IN_MEG)); + } + + return heapMegabytes; + } + + private void maybeSetHeapMegabytes(Integer megabytes) { + if (megabytes != null) { + result.setHeapMegabytes(megabytes); + } + } + + private void maybeSetJobMapMB(Integer megabytes) { + if (megabytes != null) { + result.setJobMapMB(megabytes); + } + } + + private void maybeSetJobReduceMB(Integer megabytes) { + if (megabytes != null) { + result.setJobReduceMB(megabytes); + } + } + + /** + * Process a collection of JobConf {@link Properties}. We do not restrict it + * to be called once. It is okay to process a conf before, during or after the + * events. + * + * @param conf + * The job conf properties to be added. + */ + public void process(Properties conf) { + if (finalized) { + throw new IllegalStateException( + "JobBuilder.process(Properties conf) called after LoggedJob built"); + } + + result.setQueue(extract(conf, JobConfPropertyNames.QUEUE_NAMES + .getCandidates(), "default")); + result.setJobName(extract(conf, JobConfPropertyNames.JOB_NAMES + .getCandidates(), null)); + + maybeSetHeapMegabytes(extractMegabytes(conf, + JobConfPropertyNames.TASK_JAVA_OPTS_S.getCandidates())); + maybeSetJobMapMB(extractMegabytes(conf, + JobConfPropertyNames.MAP_JAVA_OPTS_S.getCandidates())); + maybeSetJobReduceMB(extractMegabytes(conf, + JobConfPropertyNames.REDUCE_JAVA_OPTS_S.getCandidates())); + } + + /** + * Request the builder to build the final object. Once called, the + * {@link JobBuilder} would accept no more events or job-conf properties. + * + * @return Parsed {@link LoggedJob} object. + */ + public LoggedJob build() { + // The main job here is to build CDFs + finalized = true; + + // initialize all the per-job statistics gathering places + Histogram[] successfulMapAttemptTimes = + new Histogram[ParsedHost.numberOfDistances() + 1]; + for (int i = 0; i < successfulMapAttemptTimes.length; ++i) { + successfulMapAttemptTimes[i] = new Histogram(); + } + + Histogram successfulReduceAttemptTimes = new Histogram(); + Histogram[] failedMapAttemptTimes = + new Histogram[ParsedHost.numberOfDistances() + 1]; + for (int i = 0; i < failedMapAttemptTimes.length; ++i) { + failedMapAttemptTimes[i] = new Histogram(); + } + Histogram failedReduceAttemptTimes = new Histogram(); + + Histogram successfulNthMapperAttempts = new Histogram(); + // Histogram successfulNthReducerAttempts = new Histogram(); + // Histogram mapperLocality = new Histogram(); + + for (LoggedTask task : result.getMapTasks()) { + for (LoggedTaskAttempt attempt : task.getAttempts()) { + int distance = successfulMapAttemptTimes.length - 1; + Long runtime = null; + + if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) { + runtime = attempt.getFinishTime() - attempt.getStartTime(); + + if (attempt.getResult() == Values.SUCCESS) { + LoggedLocation host = attempt.getLocation(); + + List locs = task.getPreferredLocations(); + + if (host != null && locs != null) { + for (LoggedLocation loc : locs) { + ParsedHost preferedLoc = new ParsedHost(loc); + + distance = + Math.min(distance, preferedLoc + .distance(new ParsedHost(host))); + } + + // mapperLocality.enter(distance); + } + + if (attempt.getStartTime() > 0 && attempt.getFinishTime() > 0) { + if (runtime != null) { + successfulMapAttemptTimes[distance].enter(runtime); + } + } + + String attemptID = attempt.getAttemptID(); + + if (attemptID != null) { + Matcher matcher = taskAttemptIDPattern.matcher(attemptID); + + if (matcher.matches()) { + String attemptNumberString = matcher.group(1); + + if (attemptNumberString != null) { + int attemptNumber = Integer.parseInt(attemptNumberString); + + successfulNthMapperAttempts.enter(attemptNumber); + } + } + } + } else { + if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) { + if (runtime != null) { + failedMapAttemptTimes[distance].enter(runtime); + } + } + } + } + } + } + + for (LoggedTask task : result.getReduceTasks()) { + for (LoggedTaskAttempt attempt : task.getAttempts()) { + Long runtime = attempt.getFinishTime() - attempt.getStartTime(); + + if (attempt.getFinishTime() > 0 && attempt.getStartTime() > 0) { + runtime = attempt.getFinishTime() - attempt.getStartTime(); + } + if (attempt.getResult() == Values.SUCCESS) { + if (runtime != null) { + successfulReduceAttemptTimes.enter(runtime); + } + } else if (attempt.getResult() == Pre21JobHistoryConstants.Values.FAILED) { + failedReduceAttemptTimes.enter(runtime); + } + } + } + + result.setFailedMapAttemptCDFs(mapCDFArrayList(failedMapAttemptTimes)); + + LoggedDiscreteCDF failedReduce = new LoggedDiscreteCDF(); + failedReduce.setCDF(failedReduceAttemptTimes, attemptTimesPercentiles, 100); + result.setFailedReduceAttemptCDF(failedReduce); + + result + .setSuccessfulMapAttemptCDFs(mapCDFArrayList(successfulMapAttemptTimes)); + + LoggedDiscreteCDF succReduce = new LoggedDiscreteCDF(); + succReduce.setCDF(successfulReduceAttemptTimes, attemptTimesPercentiles, + 100); + result.setSuccessfulReduceAttemptCDF(succReduce); + + result.setFailedMapAttemptCDFs(null); + + long totalSuccessfulAttempts = 0L; + long maxTriesToSucceed = 0L; + + for (Map.Entry ent : successfulNthMapperAttempts) { + totalSuccessfulAttempts += ent.getValue(); + maxTriesToSucceed = Math.max(maxTriesToSucceed, ent.getKey()); + } + + if (totalSuccessfulAttempts > 0L) { + double[] successAfterI = new double[(int) maxTriesToSucceed + 1]; + for (int i = 0; i < successAfterI.length; ++i) { + successAfterI[i] = 0.0D; + } + + for (Map.Entry ent : successfulNthMapperAttempts) { + successAfterI[ent.getKey().intValue()] = + ((double) ent.getValue()) / totalSuccessfulAttempts; + } + result.setMapperTriesToSucceed(successAfterI); + } else { + result.setMapperTriesToSucceed(null); + } + + return result; + } + + private ArrayList mapCDFArrayList(Histogram[] data) { + ArrayList result = new ArrayList(); + + for (Histogram hist : data) { + LoggedDiscreteCDF discCDF = new LoggedDiscreteCDF(); + discCDF.setCDF(hist, attemptTimesPercentiles, 100); + result.add(discCDF); + } + + return result; + } + + private static Values getPre21Value(String name) { + if (name.equalsIgnoreCase("JOB_CLEANUP")) { + return Values.CLEANUP; + } + if (name.equalsIgnoreCase("JOB_SETUP")) { + return Values.SETUP; + } + + return Values.valueOf(name.toUpperCase()); + } + + private void processTaskUpdatedEvent(TaskUpdatedEvent event) { + LoggedTask task = getTask(event.getTaskId().toString()); + if (task == null) { + return; + } + task.setFinishTime(event.getFinishTime()); + } + + private void processTaskStartedEvent(TaskStartedEvent event) { + LoggedTask task = + getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), true); + task.setStartTime(event.getStartTime()); + task.setPreferredLocations(preferredLocationForSplits(event + .getSplitLocations())); + } + + private void processTaskFinishedEvent(TaskFinishedEvent event) { + LoggedTask task = + getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false); + if (task == null) { + return; + } + task.setFinishTime(event.getFinishTime()); + task.setTaskStatus(getPre21Value(event.getTaskStatus())); + task.incorporateCounters(((TaskFinished) event.getDatum()).counters); + } + + private void processTaskFailedEvent(TaskFailedEvent event) { + LoggedTask task = + getOrMakeTask(event.getTaskType(), event.getTaskId().toString(), false); + if (task == null) { + return; + } + task.setFinishTime(event.getFinishTime()); + task.setTaskStatus(getPre21Value(event.getTaskStatus())); + } + + private void processTaskAttemptUnsuccessfulCompletionEvent( + TaskAttemptUnsuccessfulCompletionEvent event) { + LoggedTaskAttempt attempt = + getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(), + event.getTaskAttemptId().toString()); + + if (attempt == null) { + return; + } + + attempt.setResult(getPre21Value(event.getTaskStatus())); + ParsedHost parsedHost = getAndRecordParsedHost(event.getHostname()); + + if (parsedHost != null) { + attempt.setLocation(parsedHost.makeLoggedLocation()); + } + + attempt.setFinishTime(event.getFinishTime()); + } + + private void processTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { + LoggedTaskAttempt attempt = + getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(), + event.getTaskAttemptId().toString()); + if (attempt == null) { + return; + } + attempt.setStartTime(event.getStartTime()); + } + + private void processTaskAttemptFinishedEvent(TaskAttemptFinishedEvent event) { + LoggedTaskAttempt attempt = + getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(), + event.getAttemptId().toString()); + if (attempt == null) { + return; + } + attempt.setResult(getPre21Value(event.getTaskStatus())); + attempt.setLocation(getAndRecordParsedHost(event.getHostname()) + .makeLoggedLocation()); + attempt.setFinishTime(event.getFinishTime()); + attempt + .incorporateCounters(((TaskAttemptFinished) event.getDatum()).counters); + } + + private void processReduceAttemptFinishedEvent( + ReduceAttemptFinishedEvent event) { + LoggedTaskAttempt attempt = + getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(), + event.getAttemptId().toString()); + if (attempt == null) { + return; + } + attempt.setResult(getPre21Value(event.getTaskStatus())); + attempt.setHostName(event.getHostname()); + // XXX There may be redundant location info available in the event. + // We might consider extracting it from this event. Currently this + // is redundant, but making this will add future-proofing. + attempt.setFinishTime(event.getFinishTime()); + attempt.setShuffleFinished(event.getShuffleFinishTime()); + attempt.setSortFinished(event.getSortFinishTime()); + attempt + .incorporateCounters(((ReduceAttemptFinished) event.getDatum()).counters); + } + + private void processMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { + LoggedTaskAttempt attempt = + getOrMakeTaskAttempt(event.getTaskType(), event.getTaskId().toString(), + event.getAttemptId().toString()); + if (attempt == null) { + return; + } + attempt.setResult(getPre21Value(event.getTaskStatus())); + attempt.setHostName(event.getHostname()); + // XXX There may be redundant location info available in the event. + // We might consider extracting it from this event. Currently this + // is redundant, but making this will add future-proofing. + attempt.setFinishTime(event.getFinishTime()); + attempt + .incorporateCounters(((MapAttemptFinished) event.getDatum()).counters); + } + + private void processJobUnsuccessfulCompletionEvent( + JobUnsuccessfulCompletionEvent event) { + result.setOutcome(Pre21JobHistoryConstants.Values + .valueOf(event.getStatus())); + result.setFinishTime(event.getFinishTime()); + } + + private void processJobSubmittedEvent(JobSubmittedEvent event) { + result.setJobID(event.getJobId().toString()); + result.setJobName(event.getJobName()); + result.setUser(event.getUserName()); + result.setSubmitTime(event.getSubmitTime()); + } + + private void processJobStatusChangedEvent(JobStatusChangedEvent event) { + result.setOutcome(Pre21JobHistoryConstants.Values + .valueOf(event.getStatus())); + } + + private void processJobPriorityChangeEvent(JobPriorityChangeEvent event) { + result.setPriority(LoggedJob.JobPriority.valueOf(event.getPriority() + .toString())); + } + + private void processJobInitedEvent(JobInitedEvent event) { + result.setLaunchTime(event.getLaunchTime()); + result.setTotalMaps(event.getTotalMaps()); + result.setTotalReduces(event.getTotalReduces()); + } + + private void processJobInfoChangeEvent(JobInfoChangeEvent event) { + result.setLaunchTime(event.getLaunchTime()); + } + + private void processJobFinishedEvent(JobFinishedEvent event) { + result.setFinishTime(event.getFinishTime()); + result.setJobID(jobID); + result.setOutcome(Values.SUCCESS); + } + + private LoggedTask getTask(String taskIDname) { + LoggedTask result = mapTasks.get(taskIDname); + + if (result != null) { + return result; + } + + result = reduceTasks.get(taskIDname); + + if (result != null) { + return result; + } + + return otherTasks.get(taskIDname); + } + + /** + * @param type + * the task type + * @param taskIDname + * the task ID name, as a string + * @param allowCreate + * if true, we can create a task. + * @return + */ + private LoggedTask getOrMakeTask(TaskType type, String taskIDname, + boolean allowCreate) { + Map taskMap = otherTasks; + List tasks = this.result.getOtherTasks(); + + switch (type) { + case MAP: + taskMap = mapTasks; + tasks = this.result.getMapTasks(); + break; + + case REDUCE: + taskMap = reduceTasks; + tasks = this.result.getReduceTasks(); + break; + + default: + // no code + } + + LoggedTask result = taskMap.get(taskIDname); + + if (result == null && allowCreate) { + result = new LoggedTask(); + result.setTaskType(getPre21Value(type.toString())); + result.setTaskID(taskIDname); + taskMap.put(taskIDname, result); + tasks.add(result); + } + + return result; + } + + private LoggedTaskAttempt getOrMakeTaskAttempt(TaskType type, + String taskIDName, String taskAttemptName) { + LoggedTask task = getOrMakeTask(type, taskIDName, false); + LoggedTaskAttempt result = attempts.get(taskAttemptName); + + if (result == null && task != null) { + result = new LoggedTaskAttempt(); + result.setAttemptID(taskAttemptName); + attempts.put(taskAttemptName, result); + task.getAttempts().add(result); + } + + return result; + } + + private ParsedHost getAndRecordParsedHost(String hostName) { + ParsedHost result = ParsedHost.parse(hostName); + + if (result != null) { + ParsedHost canonicalResult = allHosts.get(result); + + if (canonicalResult != null) { + return canonicalResult; + } + + allHosts.put(result, result); + + return result; + } + + return null; + } + + private ArrayList preferredLocationForSplits(String splits) { + if (splits != null) { + ArrayList locations = null; + + StringTokenizer tok = new StringTokenizer(splits, ",", false); + + if (tok.countTokens() <= MAXIMUM_PREFERRED_LOCATIONS) { + locations = new ArrayList(); + + while (tok.hasMoreTokens()) { + String nextSplit = tok.nextToken(); + + ParsedHost node = getAndRecordParsedHost(nextSplit); + + if (locations != null && node != null) { + locations.add(node.makeLoggedLocation()); + } + } + + return locations; + } + } + + return null; + } +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfPropertyNames.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import org.apache.hadoop.mapreduce.JobContext; + +public enum JobConfPropertyNames { + QUEUE_NAMES("mapred.job.queue.name", JobContext.QUEUE_NAME), JOB_NAMES( + JobContext.JOB_NAME), TASK_JAVA_OPTS_S("mapred.child.java.opts"), + MAP_JAVA_OPTS_S("mapred.child.java.opts", JobContext.MAP_JAVA_OPTS), + REDUCE_JAVA_OPTS_S("mapred.child.java.opts", JobContext.REDUCE_JAVA_OPTS); + + private String[] candidates; + + JobConfPropertyNames(String... candidates) { + this.candidates = candidates; + } + + public String[] getCandidates() { + return candidates; + } +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobConfigurationParser.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.w3c.dom.Document; +import org.w3c.dom.Element; +import org.w3c.dom.NodeList; +import org.w3c.dom.Node; +import org.w3c.dom.Text; +import org.xml.sax.SAXException; + +/** + * {@link JobConfigurationParser} parses the job configuration xml file, and + * extracts various framework specific properties. It parses the file using a + * stream-parser and thus is more memory efficient. [This optimization may be + * postponed for a future release] + */ +public class JobConfigurationParser { + final private Set interested; + + /** + * Constructor + * + * @param interested + * properties we should extract from the job configuration xml. + */ + public JobConfigurationParser(List interested) { + this.interested = new HashSet(interested); + } + + /** + * Parse the job configuration file (as an input stream) and return a + * {@link Properties} collection. The input stream will not be closed after + * return from the call. + * + * @param input + * The input data. + * @return A {@link Properties} collection extracted from the job + * configuration xml. + * @throws IOException + */ + Properties parse(InputStream input) throws IOException { + Properties result = new Properties(); + + try { + DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance(); + + DocumentBuilder db = dbf.newDocumentBuilder(); + + Document doc = db.parse(input); + + Element root = doc.getDocumentElement(); + + if (!"configuration".equals(root.getTagName())) { + System.out.print("root is not a configuration node"); + return null; + } + + NodeList props = root.getChildNodes(); + + for (int i = 0; i < props.getLength(); ++i) { + Node propNode = props.item(i); + if (!(propNode instanceof Element)) + continue; + Element prop = (Element) propNode; + if (!"property".equals(prop.getTagName())) { + System.out.print("bad conf file: element not "); + } + NodeList fields = prop.getChildNodes(); + String attr = null; + String value = null; + @SuppressWarnings("unused") + boolean finalParameter = false; + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) { + continue; + } + + Element field = (Element) fieldNode; + if ("name".equals(field.getTagName()) && field.hasChildNodes()) { + attr = ((Text) field.getFirstChild()).getData().trim(); + } + if ("value".equals(field.getTagName()) && field.hasChildNodes()) { + value = ((Text) field.getFirstChild()).getData(); + } + if ("final".equals(field.getTagName()) && field.hasChildNodes()) { + finalParameter = + "true".equals(((Text) field.getFirstChild()).getData()); + } + } + + if (interested.contains(attr) && value != null) { + result.put(attr, value); + } + } + } catch (ParserConfigurationException e) { + return null; + } catch (SAXException e) { + return null; + } + + return result; + } +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParser.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; + +/** + * {@link JobHistoryParser} defines the interface of a Job History file parser. + */ +public interface JobHistoryParser extends Closeable { + /** + * Get the next {@link HistoryEvent} + * @return the next {@link HistoryEvent}. If no more events left, return null. + * @throws IOException + */ + HistoryEvent nextEvent() throws IOException; + +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JobHistoryParserFactory.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.IOException; +import java.io.InputStream; + +/** + * {@link JobHistoryParserFactory} is a singleton class that attempts to + * determine the version of job history and return a proper parser. + */ +public class JobHistoryParserFactory { + public static JobHistoryParser getParser(RewindableInputStream ris) + throws IOException { + for (VersionDetector vd : VersionDetector.values()) { + boolean canParse = vd.canParse(ris); + ris.rewind(); + if (canParse) { + return vd.newInstance(ris); + } + } + + throw new IOException("No suitable parser."); + } + + enum VersionDetector { + Hadoop20() { + + @Override + public boolean canParse(InputStream input) throws IOException { + return Hadoop20JHParser.canParse(input); + } + + @Override + public JobHistoryParser newInstance(InputStream input) throws IOException { + return new Hadoop20JHParser(input); + } + }, + + Current() { + + @Override + public boolean canParse(InputStream input) throws IOException { + return CurrentJHParser.canParse(input); + } + + @Override + public JobHistoryParser newInstance(InputStream input) throws IOException { + return new CurrentJHParser(input); + } + }; + + abstract JobHistoryParser newInstance(InputStream input) throws IOException; + + abstract boolean canParse(InputStream input) throws IOException; + } +} Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java?rev=911519&r1=911518&r2=911519&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java Thu Feb 18 18:43:28 2010 @@ -23,13 +23,7 @@ import java.io.InputStream; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CodecPool; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.CompressionCodecFactory; -import org.apache.hadoop.io.compress.Decompressor; import org.codehaus.jackson.JsonParser; import org.codehaus.jackson.map.DeserializationConfig; import org.codehaus.jackson.map.ObjectMapper; @@ -44,7 +38,6 @@ private final ObjectMapper mapper; private final Class clazz; private final JsonParser jsonParser; - private final Decompressor decompressor; /** * Constructor. @@ -60,17 +53,7 @@ mapper.configure( DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); this.clazz = clazz; - FileSystem fs = path.getFileSystem(conf); - CompressionCodec codec = new CompressionCodecFactory(conf).getCodec(path); - InputStream input; - if (codec == null) { - input = fs.open(path); - decompressor = null; - } else { - FSDataInputStream fsdis = fs.open(path); - decompressor = CodecPool.getDecompressor(codec); - input = codec.createInputStream(fsdis, decompressor); - } + InputStream input = new PossiblyDecompressedInputStream(path, conf); jsonParser = mapper.getJsonFactory().createJsonParser(input); } @@ -86,7 +69,6 @@ mapper.configure( DeserializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); this.clazz = clazz; - decompressor = null; jsonParser = mapper.getJsonFactory().createJsonParser(input); } @@ -107,12 +89,6 @@ @Override public void close() throws IOException { - try { - jsonParser.close(); - } finally { - if (decompressor != null) { - CodecPool.returnDecompressor(decompressor); - } - } + jsonParser.close(); } } Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperWriter.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; + +import org.codehaus.jackson.JsonEncoding; +import org.codehaus.jackson.JsonGenerator; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; + +/** + * Simple wrapper around {@link JsonGenerator} to write objects in JSON format. + * @param The type of the objects to be written. + */ +public class JsonObjectMapperWriter implements Closeable { + private JsonGenerator writer; + + public JsonObjectMapperWriter(OutputStream output, boolean prettyPrint) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + mapper.configure( + SerializationConfig.Feature.CAN_OVERRIDE_ACCESS_MODIFIERS, true); + mapper.getJsonFactory(); + writer = mapper.getJsonFactory().createJsonGenerator( + output, JsonEncoding.UTF8); + if (prettyPrint) { + writer.useDefaultPrettyPrinter(); + } + } + + public void write(T object) throws IOException { + writer.writeObject(object); + } + + @Override + public void close() throws IOException { + writer.close(); + } +} Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java?rev=911519&r1=911518&r2=911519&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedNetworkTopology.java Thu Feb 18 18:43:28 2010 @@ -82,7 +82,7 @@ * @param level * the level number */ - LoggedNetworkTopology(HashSet hosts, String name, int level) { + LoggedNetworkTopology(Set hosts, String name, int level) { this.name = name; this.children = null; @@ -119,6 +119,10 @@ } } + LoggedNetworkTopology(Set hosts) { + this(hosts, "", 0); + } + public String getName() { return name; } Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=911519&r1=911518&r2=911519&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Thu Feb 18 18:43:28 2010 @@ -23,6 +23,11 @@ import java.util.Set; import java.util.TreeSet; +import org.apache.hadoop.mapreduce.jobhistory.Events; +import org.apache.hadoop.mapreduce.jobhistory.JhCounter; +import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup; +import org.apache.hadoop.mapreduce.jobhistory.JhCounters; + import org.codehaus.jackson.annotate.JsonAnySetter; /** @@ -47,9 +52,6 @@ List attempts = new ArrayList(); List preferredLocations = Collections.emptyList(); - int numberMaps = -1; - int numberReduces = -1; - static private Set alreadySeenAnySetterAttributes = new TreeSet(); @@ -157,22 +159,6 @@ } } - public int getNumberMaps() { - return numberMaps; - } - - void setNumberMaps(int numberMaps) { - this.numberMaps = numberMaps; - } - - public int getNumberReduces() { - return numberReduces; - } - - void setNumberReduces(int numberReduces) { - this.numberReduces = numberReduces; - } - public Pre21JobHistoryConstants.Values getTaskStatus() { return taskStatus; } @@ -189,6 +175,110 @@ this.taskType = taskType; } + private void incorporateMapCounters(JhCounters counters) { + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + task.inputBytes = val; + } + }, counters, "HDFS_BYTES_READ"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + task.outputBytes = val; + } + }, counters, "FILE_BYTES_WRITTEN"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + task.inputRecords = val; + } + }, counters, "MAP_INPUT_RECORDS"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + task.outputRecords = val; + } + }, counters, "MAP_OUTPUT_RECORDS"); + } + + private void incorporateReduceCounters(JhCounters counters) { + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + task.inputBytes = val; + } + }, counters, "REDUCE_SHUFFLE_BYTES"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + task.outputBytes = val; + } + }, counters, "HDFS_BYTES_WRITTEN"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + task.inputRecords = val; + } + }, counters, "REDUCE_INPUT_RECORDS"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + task.outputRecords = val; + } + }, counters, "REDUCE_OUTPUT_RECORDS"); + } + + // incorporate event counters + // LoggedTask MUST KNOW ITS TYPE BEFORE THIS CALL + public void incorporateCounters(JhCounters counters) { + switch (taskType) { + case MAP: + incorporateMapCounters(counters); + return; + case REDUCE: + incorporateReduceCounters(counters); + return; + // NOT exhaustive + } + } + + private static String canonicalizeCounterName(String nonCanonicalName) { + String result = nonCanonicalName.toLowerCase(); + + result = result.replace(' ', '|'); + result = result.replace('-', '|'); + result = result.replace('_', '|'); + result = result.replace('.', '|'); + + return result; + } + + private abstract class SetField { + LoggedTask task; + + SetField(LoggedTask task) { + this.task = task; + } + + abstract void set(long value); + } + + private static void incorporateCounter(SetField thunk, JhCounters counters, + String counterName) { + counterName = canonicalizeCounterName(counterName); + + for (JhCounterGroup group : counters.groups) { + for (JhCounter counter : group.counts) { + if (counterName + .equals(canonicalizeCounterName(counter.name.toString()))) { + thunk.set(counter.value); + return; + } + } + } + } + private void compare1(long c1, long c2, TreePath loc, String eltname) throws DeepInequalityException { if (c1 != c2) { Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=911519&r1=911518&r2=911519&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Thu Feb 18 18:43:28 2010 @@ -28,6 +28,11 @@ // the Jackson implementation of JSON doesn't handle a // superclass-valued field. +import org.apache.hadoop.mapreduce.jobhistory.Events; +import org.apache.hadoop.mapreduce.jobhistory.JhCounter; +import org.apache.hadoop.mapreduce.jobhistory.JhCounterGroup; +import org.apache.hadoop.mapreduce.jobhistory.JhCounters; + /** * A {@link LoggedTaskAttempt} represents an attempt to run an hadoop task in a * hadoop job. Note that a task can have several attempts. @@ -140,7 +145,7 @@ } void setHostName(String hostName) { - this.hostName = hostName.intern(); + this.hostName = hostName == null ? null : hostName.intern(); } public long getHdfsBytesRead() { @@ -263,6 +268,130 @@ this.mapInputBytes = mapInputBytes; } + // incorporate event counters + public void incorporateCounters(JhCounters counters) { + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.hdfsBytesRead = val; + } + }, counters, "HDFS_BYTES_READ"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.hdfsBytesWritten = val; + } + }, counters, "HDFS_BYTES_WRITTEN"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.fileBytesRead = val; + } + }, counters, "FILE_BYTES_READ"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.fileBytesWritten = val; + } + }, counters, "FILE_BYTES_WRITTEN"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.mapInputBytes = val; + } + }, counters, "MAP_INPUT_BYTES"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.mapInputRecords = val; + } + }, counters, "MAP_INPUT_RECORDS"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.mapOutputBytes = val; + } + }, counters, "MAP_OUTPUT_BYTES"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.mapOutputRecords = val; + } + }, counters, "MAP_OUTPUT_RECORDS"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.combineInputRecords = val; + } + }, counters, "COMBINE_INPUT_RECORDS"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.reduceInputGroups = val; + } + }, counters, "REDUCE_INPUT_GROUPS"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.reduceInputRecords = val; + } + }, counters, "REDUCE_INPUT_RECORDS"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.reduceShuffleBytes = val; + } + }, counters, "REDUCE_SHUFFLE_BYTES"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.reduceOutputRecords = val; + } + }, counters, "REDUCE_OUTPUT_RECORDS"); + incorporateCounter(new SetField(this) { + @Override + void set(long val) { + attempt.spilledRecords = val; + } + }, counters, "SPILLED_RECORDS"); + } + + private static String canonicalizeCounterName(String nonCanonicalName) { + String result = nonCanonicalName.toLowerCase(); + + result = result.replace(' ', '|'); + result = result.replace('-', '|'); + result = result.replace('_', '|'); + result = result.replace('.', '|'); + + return result; + } + + private abstract class SetField { + LoggedTaskAttempt attempt; + + SetField(LoggedTaskAttempt attempt) { + this.attempt = attempt; + } + + abstract void set(long value); + } + + private static void incorporateCounter(SetField thunk, JhCounters counters, + String counterName) { + counterName = canonicalizeCounterName(counterName); + + for (JhCounterGroup group : counters.groups) { + for (JhCounter counter : group.counts) { + if (counterName + .equals(canonicalizeCounterName(counter.name.toString()))) { + thunk.set(counter.value); + return; + } + } + } + } + private void compare1(String c1, String c2, TreePath loc, String eltname) throws DeepInequalityException { if (c1 == null && c2 == null) { Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/MapAttempt20LineHistoryEventEmitter.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent; + +public class MapAttempt20LineHistoryEventEmitter extends + TaskAttempt20LineEventEmitter { + + static List nonFinals = + new LinkedList(); + static List finals = new LinkedList(); + + static { + nonFinals.addAll(taskEventNonFinalSEEs); + + finals.add(new MapAttemptFinishedEventEmitter()); + } + + protected MapAttempt20LineHistoryEventEmitter() { + super(); + } + + static private class MapAttemptFinishedEventEmitter extends + SingleEventEmitter { + HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, + HistoryEventEmitter thatg) { + if (taskAttemptIDName == null) { + return null; + } + + TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); + + String finishTime = line.get("FINISH_TIME"); + String status = line.get("TASK_STATUS"); + + if (finishTime != null && status != null + && status.equalsIgnoreCase("success")) { + + try { + String hostName = line.get("HOSTNAME"); + String counters = line.get("COUNTERS"); + String state = line.get("STATE_STRING"); + + MapAttempt20LineHistoryEventEmitter that = + (MapAttempt20LineHistoryEventEmitter) thatg; + + if (finishTime != null && "success".equalsIgnoreCase(status)) { + return new MapAttemptFinishedEvent(taskAttemptID, + that.originalTaskType, status, Long.parseLong(finishTime), Long + .parseLong(finishTime), hostName, state, + parseCounters(counters)); + } + } catch (ParseException e) { + // no code + } + } + + return null; + } + } + + @Override + List finalSEEs() { + return finals; + } + + @Override + List nonFinalSEEs() { + return nonFinals; + } + +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Outputter.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * Interface to output a sequence of objects of type T. + */ +public interface Outputter extends Closeable { + /** + * Initialize the {@link Outputter} to a specific path. + * @param path The {@link Path} to the output file. + * @param conf Configuration + * @throws IOException + */ + public void init(Path path, Configuration conf) throws IOException; + + /** + * Output an object. + * @param object The objecte. + * @throws IOException + */ + public void output(T object) throws IOException; + +} \ No newline at end of file Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java?rev=911519&r1=911518&r2=911519&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedConfigFile.java Thu Feb 18 18:43:28 2010 @@ -39,8 +39,10 @@ import org.xml.sax.SAXException; class ParsedConfigFile { - private static final Pattern jobIDPattern = Pattern.compile("_(job_[0-9]+_[0-9]+)_"); - private static final Pattern heapPattern = Pattern.compile("-Xmx([0-9]+)([mMgG])"); + private static final Pattern jobIDPattern = + Pattern.compile("_(job_[0-9]+_[0-9]+)_"); + private static final Pattern heapPattern = + Pattern.compile("-Xmx([0-9]+)([mMgG])"); final int heapMegabytes; @@ -70,6 +72,7 @@ } @SuppressWarnings("hiding") + @Deprecated ParsedConfigFile(String filenameLine, String xmlString) { super(); @@ -138,10 +141,11 @@ value = ((Text) field.getFirstChild()).getData(); } if ("final".equals(field.getTagName()) && field.hasChildNodes()) { - finalParameter = "true".equals(((Text) field.getFirstChild()) - .getData()); + finalParameter = + "true".equals(((Text) field.getFirstChild()).getData()); } } + if ("mapred.child.java.opts".equals(attr) && value != null) { Matcher matcher = heapPattern.matcher(value); if (matcher.find()) { @@ -163,14 +167,16 @@ jobName = value; } - clusterMapMB = maybeGetIntValue(MRConfig.MAPMEMORY_MB, attr, - value, clusterMapMB); - clusterReduceMB = maybeGetIntValue(MRConfig.REDUCEMEMORY_MB, - attr, value, clusterReduceMB); - jobMapMB = maybeGetIntValue(JobContext.MAP_MEMORY_MB, attr, value, - jobMapMB); - jobReduceMB = maybeGetIntValue(JobContext.REDUCE_MEMORY_MB, attr, - value, jobReduceMB); + clusterMapMB = + maybeGetIntValue(MRConfig.MAPMEMORY_MB, attr, value, clusterMapMB); + clusterReduceMB = + maybeGetIntValue(MRConfig.REDUCEMEMORY_MB, attr, value, + clusterReduceMB); + jobMapMB = + maybeGetIntValue(JobContext.MAP_MEMORY_MB, attr, value, jobMapMB); + jobReduceMB = + maybeGetIntValue(JobContext.REDUCE_MEMORY_MB, attr, value, + jobReduceMB); } valid = true; Modified: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java?rev=911519&r1=911518&r2=911519&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java (original) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ParsedLine.java Thu Feb 18 18:43:28 2010 @@ -24,10 +24,10 @@ Properties content; LogRecordType type; - static final Pattern keyValPair = Pattern - .compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\""); + static final Pattern keyValPair = + Pattern.compile(" *([a-zA-Z0-9_]+)=\"((?:[^\"\\\\]|\\\\[ .\"\\\\])*)\""); - @SuppressWarnings("unused") + @SuppressWarnings("unused") ParsedLine(String fullLine, int version) { super(); @@ -47,8 +47,16 @@ String propValPairs = fullLine.substring(firstSpace + 1); - while (propValPairs.length() > 0 && propValPairs.charAt(0) == ' ') { - propValPairs = propValPairs.substring(1); + int pvPairsFirstNonBlank = 0; + int pvPairsLength = propValPairs.length(); + + while (pvPairsLength > pvPairsFirstNonBlank + && propValPairs.charAt(pvPairsFirstNonBlank) == ' ') { + ++pvPairsFirstNonBlank; + } + + if (pvPairsFirstNonBlank != 0) { + propValPairs = propValPairs.substring(pvPairsFirstNonBlank); } int cursor = 0; Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/PossiblyDecompressedInputStream.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CodecPool; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.io.compress.Decompressor; + +class PossiblyDecompressedInputStream extends InputStream { + private final Decompressor decompressor; + private final InputStream coreInputStream; + + public PossiblyDecompressedInputStream(Path inputPath, Configuration conf) + throws IOException { + CompressionCodecFactory codecs = new CompressionCodecFactory(conf); + CompressionCodec inputCodec = codecs.getCodec(inputPath); + + FileSystem ifs = inputPath.getFileSystem(conf); + FSDataInputStream fileIn = ifs.open(inputPath); + + if (inputCodec == null) { + decompressor = null; + coreInputStream = fileIn; + } else { + decompressor = CodecPool.getDecompressor(inputCodec); + coreInputStream = inputCodec.createInputStream(fileIn, decompressor); + } + } + + @Override + public int read() throws IOException { + return coreInputStream.read(); + } + + @Override + public int read(byte[] buffer, int offset, int length) throws IOException { + return coreInputStream.read(buffer, offset, length); + } + + @Override + public void close() throws IOException { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + } + + coreInputStream.close(); + } + +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/ReduceAttempt20LineHistoryEventEmitter.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent; + +public class ReduceAttempt20LineHistoryEventEmitter extends + TaskAttempt20LineEventEmitter { + + static List nonFinals = + new LinkedList(); + static List finals = new LinkedList(); + + static { + nonFinals.addAll(taskEventNonFinalSEEs); + + finals.add(new ReduceAttemptFinishedEventEmitter()); + } + + ReduceAttempt20LineHistoryEventEmitter() { + super(); + } + + static private class ReduceAttemptFinishedEventEmitter extends + SingleEventEmitter { + HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, + HistoryEventEmitter thatg) { + if (taskAttemptIDName == null) { + return null; + } + + TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); + + String finishTime = line.get("FINISH_TIME"); + String status = line.get("TASK_STATUS"); + + if (finishTime != null && status != null + && status.equalsIgnoreCase("success")) { + + try { + String hostName = line.get("HOSTNAME"); + String counters = line.get("COUNTERS"); + String state = line.get("STATE_STRING"); + String shuffleFinish = line.get("SHUFFLE_FINISHED"); + String sortFinish = line.get("SORT_FINISHED"); + + if (finishTime != null && shuffleFinish != null && sortFinish != null + && "success".equalsIgnoreCase(status)) { + ReduceAttempt20LineHistoryEventEmitter that = + (ReduceAttempt20LineHistoryEventEmitter) thatg; + + return new ReduceAttemptFinishedEvent(taskAttemptID, + that.originalTaskType, status, Long.parseLong(shuffleFinish), + Long.parseLong(sortFinish), Long.parseLong(finishTime), + hostName, state, parseCounters(counters)); + } + } catch (ParseException e) { + // no code + } + } + + return null; + } + } + + @Override + List finalSEEs() { + return finals; + } + + @Override + List nonFinalSEEs() { + return nonFinals; + } + +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/RewindableInputStream.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * A simple wrapper class to make any input stream "rewindable". It could be + * made more memory efficient by grow the internal buffer adaptively. + */ +public class RewindableInputStream extends InputStream { + private InputStream input; + + /** + * Constructor. + * + * @param input + */ + public RewindableInputStream(InputStream input) { + this(input, 1024 * 1024); + } + + /** + * Constructor + * + * @param input + * input stream. + * @param maxBytesToRemember + * Maximum number of bytes we need to remember at the beginning of + * the stream. If {@link #rewind()} is called after so many bytes are + * read from the stream, {@link #rewind()} would fail. + */ + public RewindableInputStream(InputStream input, int maxBytesToRemember) { + this.input = new BufferedInputStream(input, maxBytesToRemember); + this.input.mark(maxBytesToRemember); + } + + @Override + public int read() throws IOException { + return input.read(); + } + + @Override + public int read(byte[] buffer, int offset, int length) throws IOException { + return input.read(buffer, offset, length); + } + + @Override + public void close() throws IOException { + input.close(); + } + + public InputStream rewind() throws IOException { + try { + input.reset(); + return this; + } catch (IOException e) { + throw new IOException("Unable to rewind the stream", e); + } + } +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/SingleEventEmitter.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; + +abstract class SingleEventEmitter { + abstract HistoryEvent maybeEmitEvent(ParsedLine line, String name, + HistoryEventEmitter that); +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.text.ParseException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent; + +public class Task20LineHistoryEventEmitter extends HistoryEventEmitter { + + static List nonFinals = + new LinkedList(); + static List finals = new LinkedList(); + + Long originalStartTime = null; + TaskType originalTaskType = null; + + static { + nonFinals.add(new TaskStartedEventEmitter()); + nonFinals.add(new TaskUpdatedEventEmitter()); + + finals.add(new TaskFinishedEventEmitter()); + finals.add(new TaskFailedEventEmitter()); + } + + protected Task20LineHistoryEventEmitter() { + super(); + } + + static private class TaskStartedEventEmitter extends SingleEventEmitter { + HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName, + HistoryEventEmitter thatg) { + if (taskIDName == null) { + return null; + } + + TaskID taskID = TaskID.forName(taskIDName); + + String taskType = line.get("TASK_TYPE"); + String startTime = line.get("START_TIME"); + String splits = line.get("SPLITS"); + + if (startTime != null && taskType != null) { + Task20LineHistoryEventEmitter that = + (Task20LineHistoryEventEmitter) thatg; + + that.originalStartTime = Long.parseLong(startTime); + that.originalTaskType = + Version20LogInterfaceUtils.get20TaskType(taskType); + + return new TaskStartedEvent(taskID, that.originalStartTime, + that.originalTaskType, splits); + } + + return null; + } + } + + static private class TaskUpdatedEventEmitter extends SingleEventEmitter { + HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName, + HistoryEventEmitter thatg) { + if (taskIDName == null) { + return null; + } + + TaskID taskID = TaskID.forName(taskIDName); + + String finishTime = line.get("FINISH_TIME"); + + if (finishTime != null) { + return new TaskUpdatedEvent(taskID, Long.parseLong(finishTime)); + } + + return null; + } + } + + static private class TaskFinishedEventEmitter extends SingleEventEmitter { + HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName, + HistoryEventEmitter thatg) { + if (taskIDName == null) { + return null; + } + + TaskID taskID = TaskID.forName(taskIDName); + + String status = line.get("TASK_STATUS"); + String finishTime = line.get("FINISH_TIME"); + + String error = line.get("ERROR"); + + String counters = line.get("COUNTERS"); + + if (finishTime != null && error == null + && (status != null && status.equalsIgnoreCase("success"))) { + Counters eventCounters = null; + + try { + eventCounters = parseCounters(counters); + } catch (ParseException e) { + // no code + } + + Task20LineHistoryEventEmitter that = + (Task20LineHistoryEventEmitter) thatg; + + if (that.originalTaskType == null) { + return null; + } + + return new TaskFinishedEvent(taskID, Long.parseLong(finishTime), + that.originalTaskType, status, eventCounters); + } + + return null; + } + } + + static private class TaskFailedEventEmitter extends SingleEventEmitter { + HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName, + HistoryEventEmitter thatg) { + if (taskIDName == null) { + return null; + } + + TaskID taskID = TaskID.forName(taskIDName); + + String status = line.get("TASK_STATUS"); + String finishTime = line.get("FINISH_TIME"); + + String taskType = line.get("TASK_TYPE"); + + String error = line.get("ERROR"); + + if (finishTime != null + && (error != null || (status != null && !status + .equalsIgnoreCase("success")))) { + Task20LineHistoryEventEmitter that = + (Task20LineHistoryEventEmitter) thatg; + + TaskType originalTaskType = + that.originalTaskType == null ? Version20LogInterfaceUtils + .get20TaskType(taskType) : that.originalTaskType; + + return new TaskFailedEvent(taskID, Long.parseLong(finishTime), + originalTaskType, error, status, null); + } + + return null; + } + } + + @Override + List finalSEEs() { + return finals; + } + + @Override + List nonFinalSEEs() { + return nonFinals; + } +} Added: hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java?rev=911519&view=auto ============================================================================== --- hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java (added) +++ hadoop/mapreduce/trunk/src/tools/org/apache/hadoop/tools/rumen/TaskAttempt20LineEventEmitter.java Thu Feb 18 18:43:28 2010 @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools.rumen; + +import java.text.ParseException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptFinishedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent; + +public abstract class TaskAttempt20LineEventEmitter extends HistoryEventEmitter { + static List taskEventNonFinalSEEs = + new LinkedList(); + static List taskEventFinalSEEs = + new LinkedList(); + + static private final int DEFAULT_HTTP_PORT = 80; + + Long originalStartTime = null; + org.apache.hadoop.mapreduce.TaskType originalTaskType = null; + + static { + taskEventNonFinalSEEs.add(new TaskAttemptStartedEventEmitter()); + taskEventNonFinalSEEs.add(new TaskAttemptFinishedEventEmitter()); + taskEventNonFinalSEEs + .add(new TaskAttemptUnsuccessfulCompletionEventEmitter()); + } + + protected TaskAttempt20LineEventEmitter() { + super(); + } + + static private class TaskAttemptStartedEventEmitter extends + SingleEventEmitter { + HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, + HistoryEventEmitter thatg) { + if (taskAttemptIDName == null) { + return null; + } + + TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); + + String startTime = line.get("START_TIME"); + String taskType = line.get("TASK_TYPE"); + String trackerName = line.get("TRACKER_NAME"); + String httpPort = line.get("HTTP_PORT"); + + if (startTime != null && taskType != null) { + TaskAttempt20LineEventEmitter that = + (TaskAttempt20LineEventEmitter) thatg; + + that.originalStartTime = Long.parseLong(startTime); + that.originalTaskType = + Version20LogInterfaceUtils.get20TaskType(taskType); + + int port = + httpPort.equals("") ? DEFAULT_HTTP_PORT : Integer + .parseInt(httpPort); + + return new TaskAttemptStartedEvent(taskAttemptID, + that.originalTaskType, that.originalStartTime, trackerName, port); + } + + return null; + } + } + + static private class TaskAttemptFinishedEventEmitter extends + SingleEventEmitter { + HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, + HistoryEventEmitter thatg) { + if (taskAttemptIDName == null) { + return null; + } + + TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); + + String finishTime = line.get("FINISH_TIME"); + String status = line.get("TASK_STATUS"); + + if (finishTime != null && status != null + && status.equalsIgnoreCase("success")) { + + try { + String hostName = line.get("HOSTNAME"); + String counters = line.get("COUNTERS"); + String state = line.get("STATE_STRING"); + + TaskAttempt20LineEventEmitter that = + (TaskAttempt20LineEventEmitter) thatg; + + return new TaskAttemptFinishedEvent(taskAttemptID, + that.originalTaskType, status, Long.parseLong(finishTime), + hostName, state, parseCounters(counters)); + } catch (ParseException e) { + return null; + } + } + + return null; + } + } + + static private class TaskAttemptUnsuccessfulCompletionEventEmitter extends + SingleEventEmitter { + HistoryEvent maybeEmitEvent(ParsedLine line, String taskAttemptIDName, + HistoryEventEmitter thatg) { + if (taskAttemptIDName == null) { + return null; + } + + TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptIDName); + + String finishTime = line.get("FINISH_TIME"); + String status = line.get("TASK_STATUS"); + + if (finishTime != null && status != null + && !status.equalsIgnoreCase("success")) { + String hostName = line.get("HOSTNAME"); + String error = line.get("ERROR"); + + TaskAttempt20LineEventEmitter that = + (TaskAttempt20LineEventEmitter) thatg; + + return new TaskAttemptUnsuccessfulCompletionEvent(taskAttemptID, + that.originalTaskType, status, Long.parseLong(finishTime), + hostName, error); + } + + return null; + } + } +}