Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 45158 invoked from network); 18 Sep 2009 22:23:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Sep 2009 22:23:04 -0000 Received: (qmail 53742 invoked by uid 500); 18 Sep 2009 22:23:03 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 53707 invoked by uid 500); 18 Sep 2009 22:23:03 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 53697 invoked by uid 99); 18 Sep 2009 22:23:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Sep 2009 22:23:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Sep 2009 22:23:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8620A23888E7; Fri, 18 Sep 2009 22:22:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r816801 [2/2] - in /hadoop/mapreduce/trunk: ./ ivy/ src/contrib/capacity-scheduler/ src/contrib/fairscheduler/ src/contrib/gridmix/ src/contrib/mrunit/ src/contrib/sqoop/ src/contrib/streaming/ src/java/org/apache/hadoop/mapreduce/ src/java... Date: Fri, 18 Sep 2009 22:22:39 -0000 To: mapreduce-commits@hadoop.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090918222240.8620A23888E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=816801&r1=816800&r2=816801&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java Fri Sep 18 22:22:37 2009 @@ -24,42 +24,16 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; + +import org.apache.avro.util.Utf8; /** * Event to record successful completion of a reduce attempt * */ public class ReduceAttemptFinishedEvent implements HistoryEvent { - - private EventCategory category; - private TaskID taskid; - private TaskAttemptID attemptId; - private TaskType taskType; - private String taskStatus; - private long shuffleFinishTime; - private long sortFinishTime; - private long finishTime; - private String hostname; - private String state; - private Counters counters; - - enum EventFields { EVENT_CATEGORY, - TASK_ID, - TASK_ATTEMPT_ID, - TASK_TYPE, - TASK_STATUS, - SHUFFLE_FINISH_TIME, - SORT_FINISH_TIME, - FINISH_TIME, - HOSTNAME, - STATE, - COUNTERS } - - ReduceAttemptFinishedEvent() { - } + private Events.ReduceAttemptFinished datum = + new Events.ReduceAttemptFinished(); /** * Create an event to record completion of a reduce attempt @@ -78,114 +52,53 @@ long shuffleFinishTime, long sortFinishTime, long finishTime, String hostname, String state, Counters counters) { - this.taskid = id.getTaskID(); - this.attemptId = id; - this.taskType = taskType; - this.taskStatus = taskStatus; - this.shuffleFinishTime = shuffleFinishTime; - this.sortFinishTime = sortFinishTime; - this.finishTime = finishTime; - this.hostname = hostname; - this.state = state; - this.counters = counters; - this.category = EventCategory.TASK_ATTEMPT; + datum.taskid = new Utf8(id.getTaskID().toString()); + datum.attemptId = new Utf8(id.toString()); + datum.taskType = new Utf8(taskType.name()); + datum.taskStatus = new Utf8(taskStatus); + datum.shuffleFinishTime = shuffleFinishTime; + datum.sortFinishTime = sortFinishTime; + datum.finishTime = finishTime; + datum.hostname = new Utf8(hostname); + datum.state = new Utf8(state); + datum.counters = EventWriter.toAvro(counters); + } + + ReduceAttemptFinishedEvent() {} + + public Object getDatum() { return datum; } + public void setDatum(Object datum) { + this.datum = (Events.ReduceAttemptFinished)datum; } /** Get the Task ID */ - public TaskID getTaskId() { return taskid; } - /** Get the event category */ - public EventCategory getEventCategory() { return category; } + public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } /** Get the attempt id */ - public TaskAttemptID getAttemptId() { return attemptId; } + public TaskAttemptID getAttemptId() { + return TaskAttemptID.forName(datum.attemptId.toString()); + } /** Get the task type */ - public TaskType getTaskType() { return taskType; } + public TaskType getTaskType() { + return TaskType.valueOf(datum.taskType.toString()); + } /** Get the task status */ - public String getTaskStatus() { return taskStatus; } + public String getTaskStatus() { return datum.taskStatus.toString(); } /** Get the finish time of the sort phase */ - public long getSortFinishTime() { return sortFinishTime; } + public long getSortFinishTime() { return datum.sortFinishTime; } /** Get the finish time of the shuffle phase */ - public long getShuffleFinishTime() { return shuffleFinishTime; } + public long getShuffleFinishTime() { return datum.shuffleFinishTime; } /** Get the finish time of the attempt */ - public long getFinishTime() { return finishTime; } + public long getFinishTime() { return datum.finishTime; } /** Get the name of the host where the attempt ran */ - public String getHostname() { return hostname; } + public String getHostname() { return datum.hostname.toString(); } /** Get the state string */ - public String getState() { return state; } + public String getState() { return datum.state.toString(); } /** Get the counters for the attempt */ - public Counters getCounters() { return counters; } + Counters getCounters() { return EventReader.fromAvro(datum.counters); } /** Get the event type */ - public EventType getEventType() { - return EventType.REDUCE_ATTEMPT_FINISHED; + public Events.EventType getEventType() { + return Events.EventType.REDUCE_ATTEMPT_FINISHED; } - public void readFields(JsonParser jp) throws IOException { - if (jp.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("Unexpected token while reading"); - } - - while (jp.nextToken() != JsonToken.END_OBJECT) { - String fieldname = jp.getCurrentName(); - jp.nextToken(); // move to value - switch (Enum.valueOf(EventFields.class, fieldname)) { - case EVENT_CATEGORY: - category = Enum.valueOf(EventCategory.class, jp.getText()); - break; - case TASK_ID: - taskid = TaskID.forName(jp.getText()); - break; - case TASK_ATTEMPT_ID: - attemptId = TaskAttemptID.forName(jp.getText()); - break; - case TASK_TYPE: - taskType = TaskType.valueOf(jp.getText()); - break; - case TASK_STATUS: - taskStatus = jp.getText(); - break; - case SHUFFLE_FINISH_TIME: - shuffleFinishTime = jp.getLongValue(); - break; - case SORT_FINISH_TIME: - sortFinishTime = jp.getLongValue(); - break; - case FINISH_TIME: - finishTime = jp.getLongValue(); - break; - case HOSTNAME: - hostname = jp.getText(); - break; - case STATE: - state = jp.getText(); - break; - case COUNTERS: - counters = EventReader.readCounters(jp); - break; - default: - throw new IOException("Unrecognized field '"+fieldname+"'!"); - } - } - } - - public void writeFields(JsonGenerator gen) throws IOException { - gen.writeStartObject(); - gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), - category.toString()); - gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString()); - gen.writeStringField(EventFields.TASK_ATTEMPT_ID.toString(), - attemptId.toString()); - gen.writeStringField(EventFields.TASK_TYPE.toString(), - taskType.toString()); - gen.writeStringField(EventFields.TASK_STATUS.toString(), - taskStatus); - gen.writeNumberField(EventFields.SHUFFLE_FINISH_TIME.toString(), - shuffleFinishTime); - gen.writeNumberField(EventFields.SORT_FINISH_TIME.toString(), - sortFinishTime); - gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime); - gen.writeStringField(EventFields.HOSTNAME.toString(), hostname); - gen.writeStringField(EventFields.STATE.toString(), state); - EventWriter.writeCounters(counters, gen); - gen.writeEndObject(); - } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java?rev=816801&r1=816800&r2=816801&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java Fri Sep 18 22:22:37 2009 @@ -24,38 +24,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; + +import org.apache.avro.util.Utf8; /** * Event to record successful task completion * */ public class TaskAttemptFinishedEvent implements HistoryEvent { - - private EventCategory category; - private TaskID taskid; - private TaskAttemptID attemptId; - private TaskType taskType; - private String taskStatus; - private long finishTime; - private String hostname; - private String state; - private Counters counters; - - enum EventFields { EVENT_CATEGORY, - TASK_ID, - TASK_ATTEMPT_ID, - TASK_TYPE, - TASK_STATUS, - FINISH_TIME, - HOSTNAME, - STATE, - COUNTERS } - - TaskAttemptFinishedEvent() { - } + private Events.TaskAttemptFinished datum = new Events.TaskAttemptFinished(); /** * Create an event to record successful finishes for setup and cleanup @@ -72,97 +49,46 @@ TaskType taskType, String taskStatus, long finishTime, String hostname, String state, Counters counters) { - this.taskid = id.getTaskID(); - this.attemptId = id; - this.taskType = taskType; - this.taskStatus = taskStatus; - this.finishTime = finishTime; - this.hostname = hostname; - this.state = state; - this.counters = counters; - this.category = EventCategory.TASK_ATTEMPT; + datum.taskid = new Utf8(id.getTaskID().toString()); + datum.attemptId = new Utf8(id.toString()); + datum.taskType = new Utf8(taskType.name()); + datum.taskStatus = new Utf8(taskStatus); + datum.finishTime = finishTime; + datum.hostname = new Utf8(hostname); + datum.state = new Utf8(state); + datum.counters = EventWriter.toAvro(counters); + } + + TaskAttemptFinishedEvent() {} + + public Object getDatum() { return datum; } + public void setDatum(Object datum) { + this.datum = (Events.TaskAttemptFinished)datum; } /** Get the task ID */ - public TaskID getTaskId() { return taskid; } - /** Get the event category */ - public EventCategory getEventCategory() { return category; } + public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } /** Get the task attempt id */ - public TaskAttemptID getAttemptId() { return attemptId; } + public TaskAttemptID getAttemptId() { + return TaskAttemptID.forName(datum.attemptId.toString()); + } /** Get the task type */ - public TaskType getTaskType() { return taskType; } + public TaskType getTaskType() { + return TaskType.valueOf(datum.taskType.toString()); + } /** Get the task status */ - public String getTaskStatus() { return taskStatus; } + public String getTaskStatus() { return datum.taskStatus.toString(); } /** Get the attempt finish time */ - public long getFinishTime() { return finishTime; } + public long getFinishTime() { return datum.finishTime; } /** Get the host where the attempt executed */ - public String getHostname() { return hostname; } + public String getHostname() { return datum.hostname.toString(); } /** Get the state string */ - public String getState() { return state; } + public String getState() { return datum.state.toString(); } /** Get the counters for the attempt */ - public Counters getCounters() { return counters; } + Counters getCounters() { return EventReader.fromAvro(datum.counters); } /** Get the event type */ - public EventType getEventType() { - return EventType.MAP_ATTEMPT_FINISHED; + public Events.EventType getEventType() { + return Events.EventType.MAP_ATTEMPT_FINISHED; } - public void readFields(JsonParser jp) throws IOException { - if (jp.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("Unexpected token while reading"); - } - - while (jp.nextToken() != JsonToken.END_OBJECT) { - String fieldname = jp.getCurrentName(); - jp.nextToken(); // move to value - switch (Enum.valueOf(EventFields.class, fieldname)) { - case EVENT_CATEGORY: - category = Enum.valueOf(EventCategory.class, jp.getText()); - break; - case TASK_ID: - taskid = TaskID.forName(jp.getText()); - break; - case TASK_ATTEMPT_ID: - attemptId = TaskAttemptID.forName(jp.getText()); - break; - case TASK_TYPE: - taskType = TaskType.valueOf(jp.getText()); - break; - case TASK_STATUS: - taskStatus = jp.getText(); - break; - case FINISH_TIME: - finishTime = jp.getLongValue(); - break; - case HOSTNAME: - hostname = jp.getText(); - break; - case STATE: - state = jp.getText(); - break; - case COUNTERS: - counters = EventReader.readCounters(jp); - break; - default: - throw new IOException("Unrecognized field '"+fieldname+"'!"); - } - } - } - - public void writeFields(JsonGenerator gen) throws IOException { - gen.writeStartObject(); - gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), - category.toString()); - gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString()); - gen.writeStringField(EventFields.TASK_ATTEMPT_ID.toString(), - attemptId.toString()); - gen.writeStringField(EventFields.TASK_TYPE.toString(), - taskType.toString()); - gen.writeStringField(EventFields.TASK_STATUS.toString(), - taskStatus); - gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime); - gen.writeStringField(EventFields.HOSTNAME.toString(), hostname); - gen.writeStringField(EventFields.STATE.toString(), state); - EventWriter.writeCounters(counters, gen); - gen.writeEndObject(); - } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java?rev=816801&r1=816800&r2=816801&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java Fri Sep 18 22:22:37 2009 @@ -23,31 +23,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; + +import org.apache.avro.util.Utf8; /** * Event to record start of a task attempt * */ public class TaskAttemptStartedEvent implements HistoryEvent { - - private EventCategory category; - private TaskID taskid; - private TaskType taskType; - private TaskAttemptID attemptId; - private long startTime; - private String trackerName; - private int httpPort; - - enum EventFields { EVENT_CATEGORY, - TASK_ID, - TASK_TYPE, - TASK_ATTEMPT_ID, - START_TIME, - TRACKER_NAME, - HTTP_PORT } + private Events.TaskAttemptStarted datum = new Events.TaskAttemptStarted(); /** * Create an event to record the start of an attempt @@ -60,85 +44,40 @@ public TaskAttemptStartedEvent( TaskAttemptID attemptId, TaskType taskType, long startTime, String trackerName, int httpPort) { - this.attemptId = attemptId; - this.taskid = attemptId.getTaskID(); - this.startTime = startTime; - this.taskType = taskType; - this.trackerName = trackerName; - this.httpPort = httpPort; - this.category = EventCategory.TASK_ATTEMPT; + datum.attemptId = new Utf8(attemptId.toString()); + datum.taskid = new Utf8(attemptId.getTaskID().toString()); + datum.startTime = startTime; + datum.taskType = new Utf8(taskType.name()); + datum.trackerName = new Utf8(trackerName); + datum.httpPort = httpPort; } - TaskAttemptStartedEvent() { + TaskAttemptStartedEvent() {} + + public Object getDatum() { return datum; } + public void setDatum(Object datum) { + this.datum = (Events.TaskAttemptStarted)datum; } /** Get the task id */ - public TaskID getTaskId() { return taskid; } - /** Get the event category */ - public EventCategory getEventCategory() { return category; } + public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } /** Get the tracker name */ - public String getTrackerName() { return trackerName; } + public String getTrackerName() { return datum.trackerName.toString(); } /** Get the start time */ - public long getStartTime() { return startTime; } + public long getStartTime() { return datum.startTime; } /** Get the task type */ - public TaskType getTaskType() { return taskType; } + public TaskType getTaskType() { + return TaskType.valueOf(datum.taskType.toString()); + } /** Get the HTTP port */ - public int getHttpPort() { return httpPort; } + public int getHttpPort() { return datum.httpPort; } /** Get the attempt id */ - public TaskAttemptID getTaskAttemptId() { return attemptId; } - /** Get the event type */ - public EventType getEventType() { - return EventType.MAP_ATTEMPT_STARTED; + public TaskAttemptID getTaskAttemptId() { + return TaskAttemptID.forName(datum.attemptId.toString()); } - - public void readFields(JsonParser jp) throws IOException { - if (jp.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("Unexpected Token while reading"); - } - - while (jp.nextToken() != JsonToken.END_OBJECT) { - String fieldName = jp.getCurrentName(); - jp.nextToken(); // Move to the value - switch (Enum.valueOf(EventFields.class, fieldName)) { - case EVENT_CATEGORY: - category = Enum.valueOf(EventCategory.class, jp.getText()); - break; - case TASK_ID: - taskid = TaskID.forName(jp.getText()); - break; - case TASK_ATTEMPT_ID: - attemptId = TaskAttemptID.forName(jp.getText()); - break; - case TASK_TYPE: - taskType = TaskType.valueOf(jp.getText()); - break; - case START_TIME: - startTime = jp.getLongValue(); - break; - case TRACKER_NAME: - trackerName = jp.getText(); - break; - case HTTP_PORT: - httpPort = jp.getIntValue(); - break; - default: - throw new IOException("Unrecognized field '"+fieldName+"'!"); - } - } + /** Get the event type */ + public Events.EventType getEventType() { + return Events.EventType.MAP_ATTEMPT_STARTED; } - public void writeFields(JsonGenerator gen) throws IOException { - gen.writeStartObject(); - gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), - category.toString()); - gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString()); - gen.writeStringField(EventFields.TASK_ATTEMPT_ID.toString(), - attemptId.toString()); - gen.writeStringField(EventFields.TASK_TYPE.toString(), - taskType.toString()); - gen.writeNumberField(EventFields.START_TIME.toString(), startTime); - gen.writeStringField(EventFields.TRACKER_NAME.toString(), trackerName); - gen.writeNumberField(EventFields.HTTP_PORT.toString(), httpPort); - gen.writeEndObject(); - } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=816801&r1=816800&r2=816801&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java Fri Sep 18 22:22:37 2009 @@ -23,33 +23,16 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; + +import org.apache.avro.util.Utf8; /** * Event to record unsuccessful (Killed/Failed) completion of task attempts * */ public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent { - - private EventCategory category; - private TaskID taskid; - private TaskType taskType; - private TaskAttemptID attemptId; - private long finishTime; - private String hostname; - private String status; - private String error; - - enum EventFields { EVENT_CATEGORY, - TASK_ID, - TASK_TYPE, - TASK_ATTEMPT_ID, - FINISH_TIME, - HOSTNAME, - STATUS, - ERROR } + private Events.TaskAttemptUnsuccessfulCompletion datum = + new Events.TaskAttemptUnsuccessfulCompletion(); /** * Create an event to record the unsuccessful completion of attempts @@ -64,92 +47,43 @@ TaskType taskType, String status, long finishTime, String hostname, String error) { - this.taskid = id.getTaskID(); - this.taskType = taskType; - this.attemptId = id; - this.finishTime = finishTime; - this.hostname = hostname; - this.error = error; - this.status = status; - this.category = EventCategory.TASK_ATTEMPT; + datum.taskid = new Utf8(id.getTaskID().toString()); + datum.taskType = new Utf8(taskType.name()); + datum.attemptId = new Utf8(id.toString()); + datum.finishTime = finishTime; + datum.hostname = new Utf8(hostname); + datum.error = new Utf8(error); + datum.status = new Utf8(status); } - TaskAttemptUnsuccessfulCompletionEvent() { + TaskAttemptUnsuccessfulCompletionEvent() {} + + public Object getDatum() { return datum; } + public void setDatum(Object datum) { + this.datum = (Events.TaskAttemptUnsuccessfulCompletion)datum; } - /** Get the event category */ - public EventCategory getEventCategory() { return category; } /** Get the task id */ - public TaskID getTaskId() { return taskid; } + public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } /** Get the task type */ - public TaskType getTaskType() { return taskType; } + public TaskType getTaskType() { + return TaskType.valueOf(datum.taskType.toString()); + } /** Get the attempt id */ - public TaskAttemptID getTaskAttemptId() { return attemptId; } + public TaskAttemptID getTaskAttemptId() { + return TaskAttemptID.forName(datum.attemptId.toString()); + } /** Get the finish time */ - public long getFinishTime() { return finishTime; } + public long getFinishTime() { return datum.finishTime; } /** Get the name of the host where the attempt executed */ - public String getHostname() { return hostname; } + public String getHostname() { return datum.hostname.toString(); } /** Get the error string */ - public String getError() { return error; } + public String getError() { return datum.error.toString(); } /** Get the task status */ - public String getTaskStatus() { return status; } + public String getTaskStatus() { return datum.status.toString(); } /** Get the event type */ - public EventType getEventType() { - return EventType.MAP_ATTEMPT_KILLED; - } - - public void readFields(JsonParser jp) throws IOException { - if (jp.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("Unexpected Token while reading"); - } - - while (jp.nextToken() != JsonToken.END_OBJECT) { - String fieldName = jp.getCurrentName(); - jp.nextToken(); // Move to the value - switch (Enum.valueOf(EventFields.class, fieldName)) { - case EVENT_CATEGORY: - category = Enum.valueOf(EventCategory.class, jp.getText()); - break; - case TASK_ID: - taskid = TaskID.forName(jp.getText()); - break; - case TASK_TYPE: - taskType = TaskType.valueOf(jp.getText()); - break; - case TASK_ATTEMPT_ID: - attemptId = TaskAttemptID.forName(jp.getText()); - break; - case FINISH_TIME: - finishTime = jp.getLongValue(); - break; - case HOSTNAME: - hostname = jp.getText(); - break; - case ERROR: - error = jp.getText(); - break; - case STATUS: - status = jp.getText(); - break; - default: - throw new IOException("Unrecognized field '"+fieldName+"'!"); - } - } + public Events.EventType getEventType() { + return Events.EventType.MAP_ATTEMPT_KILLED; } - public void writeFields(JsonGenerator gen) throws IOException { - gen.writeStartObject(); - gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), - category.toString()); - gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString()); - gen.writeStringField(EventFields.TASK_TYPE.toString(), - taskType.toString()); - gen.writeStringField(EventFields.TASK_ATTEMPT_ID.toString(), - attemptId.toString()); - gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime); - gen.writeStringField(EventFields.HOSTNAME.toString(), hostname); - gen.writeStringField(EventFields.ERROR.toString(), error); - gen.writeStringField(EventFields.STATUS.toString(), status); - gen.writeEndObject(); - } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java?rev=816801&r1=816800&r2=816801&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java Fri Sep 18 22:22:37 2009 @@ -23,31 +23,15 @@ import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; + +import org.apache.avro.util.Utf8; /** * Event to record the failure of a task * */ public class TaskFailedEvent implements HistoryEvent { - - private EventCategory category; - private TaskID taskid; - private TaskType taskType; - private long finishTime; - private String error; - private TaskAttemptID failedDueToAttempt; - private String status; - - enum EventFields { EVENT_CATEGORY, - TASK_ID, - TASK_TYPE, - FINISH_TIME, - ERROR, - STATUS, - FAILED_ATTEMPT_ID } + private Events.TaskFailed datum = new Events.TaskFailed(); /** * Create an event to record task failure @@ -61,86 +45,41 @@ public TaskFailedEvent(TaskID id, long finishTime, TaskType taskType, String error, String status, TaskAttemptID failedDueToAttempt) { - this.taskid = id; - this.error = error; - this.finishTime = finishTime; - this.taskType = taskType; - this.failedDueToAttempt = failedDueToAttempt; - this.category = EventCategory.TASK; - this.status = status; + datum.taskid = new Utf8(id.toString()); + datum.error = new Utf8(error); + datum.finishTime = finishTime; + datum.taskType = new Utf8(taskType.name()); + datum.failedDueToAttempt = failedDueToAttempt == null + ? null + : new Utf8(failedDueToAttempt.toString()); + datum.status = new Utf8(status); } - TaskFailedEvent() { - } + TaskFailedEvent() {} + + public Object getDatum() { return datum; } + public void setDatum(Object datum) { this.datum = (Events.TaskFailed)datum; } /** Get the task id */ - public TaskID getTaskId() { return taskid; } - /** Get the event category */ - public EventCategory getEventCategory() { return category; } + public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } /** Get the error string */ - public String getError() { return error; } + public String getError() { return datum.error.toString(); } /** Get the finish time of the attempt */ - public long getFinishTime() { return finishTime; } + public long getFinishTime() { return datum.finishTime; } /** Get the task type */ - public TaskType getTaskType() { return taskType; } + public TaskType getTaskType() { + return TaskType.valueOf(datum.taskType.toString()); + } /** Get the attempt id due to which the task failed */ - public TaskAttemptID getFailedAttemptID() { return failedDueToAttempt; } + public TaskAttemptID getFailedAttemptID() { + return datum.failedDueToAttempt == null + ? null + : TaskAttemptID.forName(datum.failedDueToAttempt.toString()); + } /** Get the task status */ - public String getTaskStatus() { return status; } + public String getTaskStatus() { return datum.status.toString(); } /** Get the event type */ - public EventType getEventType() { return EventType.TASK_FAILED; } + public Events.EventType getEventType() { return Events.EventType.TASK_FAILED; } - public void readFields(JsonParser jp) throws IOException { - if (jp.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("Unexpected Token while reading"); - } - - while (jp.nextToken() != JsonToken.END_OBJECT) { - String fieldName = jp.getCurrentName(); - jp.nextToken(); // Move to the value - switch (Enum.valueOf(EventFields.class, fieldName)) { - case EVENT_CATEGORY: - category = Enum.valueOf(EventCategory.class, jp.getText()); - break; - case TASK_ID: - taskid = TaskID.forName(jp.getText()); - break; - case TASK_TYPE: - taskType = TaskType.valueOf(jp.getText()); - break; - case FINISH_TIME: - finishTime = jp.getLongValue(); - break; - case ERROR: - error = jp.getText(); - break; - case STATUS: - status = jp.getText(); - break; - case FAILED_ATTEMPT_ID: - failedDueToAttempt = TaskAttemptID.forName(jp.getText()); - break; - default: - throw new IOException("Unrecognized field '"+fieldName+"'!"); - } - } - } - - public void writeFields(JsonGenerator gen) throws IOException { - gen.writeStartObject(); - gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), - category.toString()); - gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString()); - gen.writeStringField(EventFields.TASK_TYPE.toString(), - taskType.toString()); - gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime); - gen.writeStringField(EventFields.ERROR.toString(), error); - gen.writeStringField(EventFields.STATUS.toString(), status); - if (failedDueToAttempt != null) { - gen.writeStringField(EventFields.FAILED_ATTEMPT_ID.toString(), - failedDueToAttempt.toString()); - } - gen.writeEndObject(); - } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java?rev=816801&r1=816800&r2=816801&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java Fri Sep 18 22:22:37 2009 @@ -23,33 +23,16 @@ import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; + +import org.apache.avro.util.Utf8; /** * Event to record the successful completion of a task * */ public class TaskFinishedEvent implements HistoryEvent { - - private EventCategory category; - private TaskID taskid; - private TaskType taskType; - private long finishTime; - private String status; - private Counters counters; + private Events.TaskFinished datum = new Events.TaskFinished(); - enum EventFields { EVENT_CATEGORY, - TASK_ID, - TASK_TYPE, - FINISH_TIME, - STATUS, - COUNTERS } - - TaskFinishedEvent() { - } - /** * Create an event to record the successful completion of a task * @param id Task ID @@ -61,75 +44,36 @@ public TaskFinishedEvent(TaskID id, long finishTime, TaskType taskType, String status, Counters counters) { - this.taskid = id; - this.finishTime = finishTime; - this.counters = counters; - this.taskType = taskType; - this.status = status; - this.category = EventCategory.TASK; + datum.taskid = new Utf8(id.toString()); + datum.finishTime = finishTime; + datum.counters = EventWriter.toAvro(counters); + datum.taskType = new Utf8(taskType.name()); + datum.status = new Utf8(status); } + TaskFinishedEvent() {} + + public Object getDatum() { return datum; } + public void setDatum(Object datum) { + this.datum = (Events.TaskFinished)datum; + } + /** Get task id */ - public TaskID getTaskId() { return taskid; } + public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } /** Get the task finish time */ - public long getFinishTime() { return finishTime; } + public long getFinishTime() { return datum.finishTime; } /** Get task counters */ - public Counters getCounters() { return counters; } + Counters getCounters() { return EventReader.fromAvro(datum.counters); } /** Get task type */ - public TaskType getTaskType() { return taskType; } + public TaskType getTaskType() { + return TaskType.valueOf(datum.taskType.toString()); + } /** Get task status */ - public String getTaskStatus() { return status; } + public String getTaskStatus() { return datum.status.toString(); } /** Get event type */ - public EventType getEventType() { - return EventType.TASK_FINISHED; - } - /** Get Event Category */ - public EventCategory getEventCategory() { return category; } - - - public void readFields(JsonParser jp) throws IOException { - if (jp.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("Unexpected token while reading"); - } - - while (jp.nextToken() != JsonToken.END_OBJECT) { - String fieldname = jp.getCurrentName(); - jp.nextToken(); // move to value - switch (Enum.valueOf(EventFields.class, fieldname)) { - case EVENT_CATEGORY: - category = Enum.valueOf(EventCategory.class, jp.getText()); - break; - case TASK_ID: - taskid = TaskID.forName(jp.getText()); - break; - case TASK_TYPE: - taskType = TaskType.valueOf(jp.getText()); - break; - case FINISH_TIME: - finishTime = jp.getLongValue(); - break; - case STATUS: - status = jp.getText(); - break; - case COUNTERS: - counters = EventReader.readCounters(jp); - break; - default: - throw new IOException("Unrecognized field '"+fieldname+"'!"); - } - } + public Events.EventType getEventType() { + return Events.EventType.TASK_FINISHED; } - public void writeFields(JsonGenerator gen) throws IOException { - gen.writeStartObject(); - gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), - category.toString()); - gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString()); - gen.writeStringField(EventFields.TASK_TYPE.toString(), - taskType.toString()); - gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime); - gen.writeStringField(EventFields.STATUS.toString(), status); - EventWriter.writeCounters(counters, gen); - gen.writeEndObject(); - } + } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java?rev=816801&r1=816800&r2=816801&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java Fri Sep 18 22:22:37 2009 @@ -22,27 +22,15 @@ import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; + +import org.apache.avro.util.Utf8; /** * Event to record the start of a task * */ public class TaskStartedEvent implements HistoryEvent { - - private EventCategory category; - private TaskID taskid; - private TaskType taskType; - private long startTime; - private String splitLocations; - - enum EventFields { EVENT_CATEGORY, - TASK_ID, - TASK_TYPE, - START_TIME, - SPLIT_LOCATIONS } + private Events.TaskStarted datum = new Events.TaskStarted(); /** * Create an event to record start of a task @@ -53,71 +41,30 @@ */ public TaskStartedEvent(TaskID id, long startTime, TaskType taskType, String splitLocations) { - this.taskid = id; - this.splitLocations = splitLocations; - this.startTime = startTime; - this.taskType = taskType; - this.category = EventCategory.TASK; + datum.taskid = new Utf8(id.toString()); + datum.splitLocations = new Utf8(splitLocations); + datum.startTime = startTime; + datum.taskType = new Utf8(taskType.name()); } - TaskStartedEvent() { - } + TaskStartedEvent() {} + + public Object getDatum() { return datum; } + public void setDatum(Object datum) { this.datum = (Events.TaskStarted)datum; } /** Get the task id */ - public TaskID getTaskId() { return taskid; } - /** Get the event category */ - public EventCategory getEventCategory() { return category; } + public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } /** Get the split locations, applicable for map tasks */ - public String getSplitLocations() { return splitLocations; } + public String getSplitLocations() { return datum.splitLocations.toString(); } /** Get the start time of the task */ - public long getStartTime() { return startTime; } + public long getStartTime() { return datum.startTime; } /** Get the task type */ - public TaskType getTaskType() { return taskType; } - /** Get the event type */ - public EventType getEventType() { - return EventType.TASK_STARTED; + public TaskType getTaskType() { + return TaskType.valueOf(datum.taskType.toString()); } - - public void readFields(JsonParser jp) throws IOException { - if (jp.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("Unexpected Token while reading"); - } - - while (jp.nextToken() != JsonToken.END_OBJECT) { - String fieldName = jp.getCurrentName(); - jp.nextToken(); // Move to the value - switch (Enum.valueOf(EventFields.class, fieldName)) { - case EVENT_CATEGORY: - category = Enum.valueOf(EventCategory.class, jp.getText()); - break; - case TASK_ID: - taskid = TaskID.forName(jp.getText()); - break; - case TASK_TYPE: - taskType = TaskType.valueOf(jp.getText()); - break; - case START_TIME: - startTime = jp.getLongValue(); - break; - case SPLIT_LOCATIONS: - splitLocations = jp.getText(); - break; - default: - throw new IOException("Unrecognized field '"+fieldName+"'!"); - } - } + /** Get the event type */ + public Events.EventType getEventType() { + return Events.EventType.TASK_STARTED; } - public void writeFields(JsonGenerator gen) throws IOException { - gen.writeStartObject(); - gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), - category.toString()); - gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString()); - gen.writeStringField(EventFields.TASK_TYPE.toString(), - taskType.toString()); - gen.writeNumberField(EventFields.START_TIME.toString(), startTime); - gen.writeStringField(EventFields.SPLIT_LOCATIONS.toString(), - splitLocations); - gen.writeEndObject(); - } } Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java?rev=816801&r1=816800&r2=816801&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java Fri Sep 18 22:22:37 2009 @@ -21,23 +21,15 @@ import java.io.IOException; import org.apache.hadoop.mapreduce.TaskID; -import org.codehaus.jackson.JsonGenerator; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; + +import org.apache.avro.util.Utf8; /** * Event to record updates to a task * */ public class TaskUpdatedEvent implements HistoryEvent { - - private EventCategory category; - private TaskID taskid; - private long finishTime; - - enum EventFields { EVENT_CATEGORY, - TASK_ID, - FINISH_TIME } + private Events.TaskUpdated datum = new Events.TaskUpdated(); /** * Create an event to record task updates @@ -45,54 +37,22 @@ * @param finishTime Finish time of the task */ public TaskUpdatedEvent(TaskID id, long finishTime) { - this.taskid = id; - this.finishTime = finishTime; - this.category = EventCategory.TASK; + datum.taskid = new Utf8(id.toString()); + datum.finishTime = finishTime; } - TaskUpdatedEvent() { - } + TaskUpdatedEvent() {} + + public Object getDatum() { return datum; } + public void setDatum(Object datum) { this.datum = (Events.TaskUpdated)datum; } + /** Get the task ID */ - public TaskID getTaskId() { return taskid; } - /** Get the event category */ - public EventCategory getEventCategory() { return category; } + public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); } /** Get the task finish time */ - public long getFinishTime() { return finishTime; } + public long getFinishTime() { return datum.finishTime; } /** Get the event type */ - public EventType getEventType() { - return EventType.TASK_UPDATED; - } - - public void readFields(JsonParser jp) throws IOException { - if (jp.nextToken() != JsonToken.START_OBJECT) { - throw new IOException("Unexpected Token while reading"); - } - - while (jp.nextToken() != JsonToken.END_OBJECT) { - String fieldName = jp.getCurrentName(); - jp.nextToken(); // Move to the value - switch (Enum.valueOf(EventFields.class, fieldName)) { - case EVENT_CATEGORY: - category = Enum.valueOf(EventCategory.class, jp.getText()); - break; - case TASK_ID: - taskid = TaskID.forName(jp.getText()); - break; - case FINISH_TIME: - finishTime = jp.getLongValue(); - break; - default: - throw new IOException("Unrecognized field '"+fieldName+"'!"); - } - } + public Events.EventType getEventType() { + return Events.EventType.TASK_UPDATED; } - public void writeFields(JsonGenerator gen) throws IOException { - gen.writeStartObject(); - gen.writeStringField(EventFields.EVENT_CATEGORY.toString(), - category.toString()); - gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString()); - gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime); - gen.writeEndObject(); - } }