hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r816801 [2/2] - in /hadoop/mapreduce/trunk: ./ ivy/ src/contrib/capacity-scheduler/ src/contrib/fairscheduler/ src/contrib/gridmix/ src/contrib/mrunit/ src/contrib/sqoop/ src/contrib/streaming/ src/java/org/apache/hadoop/mapreduce/ src/java...
Date Fri, 18 Sep 2009 22:22:39 GMT
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
Fri Sep 18 22:22:37 2009
@@ -24,42 +24,16 @@
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record successful completion of a reduce attempt
  *
  */
 public class ReduceAttemptFinishedEvent  implements HistoryEvent {
-
-  private EventCategory category;
-  private TaskID taskid;
-  private TaskAttemptID attemptId;
-  private TaskType taskType;
-  private String taskStatus;
-  private long shuffleFinishTime;
-  private long sortFinishTime;
-  private long finishTime;
-  private String hostname;
-  private String state;
-  private Counters counters;
-
-  enum EventFields { EVENT_CATEGORY,
-    TASK_ID,
-    TASK_ATTEMPT_ID,
-    TASK_TYPE,
-    TASK_STATUS,
-    SHUFFLE_FINISH_TIME,
-    SORT_FINISH_TIME,
-    FINISH_TIME,
-    HOSTNAME,
-    STATE,
-    COUNTERS }
-
-  ReduceAttemptFinishedEvent() {
-  }
+  private Events.ReduceAttemptFinished datum =
+    new Events.ReduceAttemptFinished();
 
   /**
    * Create an event to record completion of a reduce attempt
@@ -78,114 +52,53 @@
       long shuffleFinishTime, long sortFinishTime, 
       long finishTime,
       String hostname, String state, Counters counters) {
-    this.taskid = id.getTaskID();
-    this.attemptId = id;
-    this.taskType = taskType;
-    this.taskStatus = taskStatus;
-    this.shuffleFinishTime = shuffleFinishTime;
-    this.sortFinishTime = sortFinishTime;
-    this.finishTime = finishTime;
-    this.hostname = hostname;
-    this.state = state;
-    this.counters = counters;
-    this.category = EventCategory.TASK_ATTEMPT;
+    datum.taskid = new Utf8(id.getTaskID().toString());
+    datum.attemptId = new Utf8(id.toString());
+    datum.taskType = new Utf8(taskType.name());
+    datum.taskStatus = new Utf8(taskStatus);
+    datum.shuffleFinishTime = shuffleFinishTime;
+    datum.sortFinishTime = sortFinishTime;
+    datum.finishTime = finishTime;
+    datum.hostname = new Utf8(hostname);
+    datum.state = new Utf8(state);
+    datum.counters = EventWriter.toAvro(counters);
+  }
+
+  ReduceAttemptFinishedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.ReduceAttemptFinished)datum;
   }
 
   /** Get the Task ID */
-  public TaskID getTaskId() { return taskid; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the attempt id */
-  public TaskAttemptID getAttemptId() { return attemptId; }
+  public TaskAttemptID getAttemptId() {
+    return TaskAttemptID.forName(datum.attemptId.toString());
+  }
   /** Get the task type */
-  public TaskType getTaskType() { return taskType; }
+  public TaskType getTaskType() {
+    return TaskType.valueOf(datum.taskType.toString());
+  }
   /** Get the task status */
-  public String getTaskStatus() { return taskStatus; }
+  public String getTaskStatus() { return datum.taskStatus.toString(); }
   /** Get the finish time of the sort phase */
-  public long getSortFinishTime() { return sortFinishTime; }
+  public long getSortFinishTime() { return datum.sortFinishTime; }
   /** Get the finish time of the shuffle phase */
-  public long getShuffleFinishTime() { return shuffleFinishTime; }
+  public long getShuffleFinishTime() { return datum.shuffleFinishTime; }
   /** Get the finish time of the attempt */
-  public long getFinishTime() { return finishTime; }
+  public long getFinishTime() { return datum.finishTime; }
   /** Get the name of the host where the attempt ran */
-  public String getHostname() { return hostname; }
+  public String getHostname() { return datum.hostname.toString(); }
   /** Get the state string */
-  public String getState() { return state; }
+  public String getState() { return datum.state.toString(); }
   /** Get the counters for the attempt */
-  public Counters getCounters() { return counters; }
+  Counters getCounters() { return EventReader.fromAvro(datum.counters); }
   /** Get the event type */
-  public EventType getEventType() {
-    return EventType.REDUCE_ATTEMPT_FINISHED;
+  public Events.EventType getEventType() {
+    return Events.EventType.REDUCE_ATTEMPT_FINISHED;
   }
 
 
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected token while reading");
-    }
-
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldname = jp.getCurrentName();
-      jp.nextToken(); // move to value
-      switch (Enum.valueOf(EventFields.class, fieldname)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case TASK_ID:
-        taskid = TaskID.forName(jp.getText());
-        break;
-      case TASK_ATTEMPT_ID:
-        attemptId = TaskAttemptID.forName(jp.getText());
-        break;
-      case TASK_TYPE:
-        taskType = TaskType.valueOf(jp.getText());
-        break;
-      case TASK_STATUS:
-        taskStatus = jp.getText();
-        break;
-      case SHUFFLE_FINISH_TIME:
-        shuffleFinishTime = jp.getLongValue();
-        break;
-      case SORT_FINISH_TIME:
-        sortFinishTime = jp.getLongValue();
-        break;
-      case FINISH_TIME:
-        finishTime = jp.getLongValue();
-        break;
-      case HOSTNAME:
-        hostname = jp.getText();
-        break;
-      case STATE:
-        state = jp.getText();
-        break;
-      case COUNTERS:
-        counters = EventReader.readCounters(jp);
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldname+"'!");
-      }
-    }
-  }
-
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString());
-    gen.writeStringField(EventFields.TASK_ATTEMPT_ID.toString(),
-        attemptId.toString());
-    gen.writeStringField(EventFields.TASK_TYPE.toString(),
-        taskType.toString());
-    gen.writeStringField(EventFields.TASK_STATUS.toString(),
-        taskStatus);
-    gen.writeNumberField(EventFields.SHUFFLE_FINISH_TIME.toString(),
-        shuffleFinishTime);
-    gen.writeNumberField(EventFields.SORT_FINISH_TIME.toString(),
-        sortFinishTime);
-    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
-    gen.writeStringField(EventFields.HOSTNAME.toString(), hostname);
-    gen.writeStringField(EventFields.STATE.toString(), state);
-    EventWriter.writeCounters(counters, gen);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
Fri Sep 18 22:22:37 2009
@@ -24,38 +24,15 @@
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record successful task completion
  *
  */
 public class TaskAttemptFinishedEvent  implements HistoryEvent {
-
-  private EventCategory category;
-  private TaskID taskid;
-  private TaskAttemptID attemptId;
-  private TaskType taskType;
-  private String taskStatus;
-  private long finishTime;
-  private String hostname;
-  private String state;
-  private Counters counters;
-
-  enum EventFields { EVENT_CATEGORY,
-    TASK_ID,
-    TASK_ATTEMPT_ID,
-    TASK_TYPE,
-    TASK_STATUS,
-    FINISH_TIME,
-    HOSTNAME,
-    STATE,
-    COUNTERS }
-
-  TaskAttemptFinishedEvent() {
-  }
+  private Events.TaskAttemptFinished datum = new Events.TaskAttemptFinished();
 
   /**
    * Create an event to record successful finishes for setup and cleanup 
@@ -72,97 +49,46 @@
       TaskType taskType, String taskStatus, 
       long finishTime,
       String hostname, String state, Counters counters) {
-    this.taskid = id.getTaskID();
-    this.attemptId = id;
-    this.taskType = taskType;
-    this.taskStatus = taskStatus;
-    this.finishTime = finishTime;
-    this.hostname = hostname;
-    this.state = state;
-    this.counters = counters;
-    this.category = EventCategory.TASK_ATTEMPT;
+    datum.taskid = new Utf8(id.getTaskID().toString());
+    datum.attemptId = new Utf8(id.toString());
+    datum.taskType = new Utf8(taskType.name());
+    datum.taskStatus = new Utf8(taskStatus);
+    datum.finishTime = finishTime;
+    datum.hostname = new Utf8(hostname);
+    datum.state = new Utf8(state);
+    datum.counters = EventWriter.toAvro(counters);
+  }
+
+  TaskAttemptFinishedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.TaskAttemptFinished)datum;
   }
 
   /** Get the task ID */
-  public TaskID getTaskId() { return taskid; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the task attempt id */
-  public TaskAttemptID getAttemptId() { return attemptId; }
+  public TaskAttemptID getAttemptId() {
+    return TaskAttemptID.forName(datum.attemptId.toString());
+  }
   /** Get the task type */
-  public TaskType getTaskType() { return taskType; }
+  public TaskType getTaskType() {
+    return TaskType.valueOf(datum.taskType.toString());
+  }
   /** Get the task status */
-  public String getTaskStatus() { return taskStatus; }
+  public String getTaskStatus() { return datum.taskStatus.toString(); }
   /** Get the attempt finish time */
-  public long getFinishTime() { return finishTime; }
+  public long getFinishTime() { return datum.finishTime; }
   /** Get the host where the attempt executed */
-  public String getHostname() { return hostname; }
+  public String getHostname() { return datum.hostname.toString(); }
   /** Get the state string */
-  public String getState() { return state; }
+  public String getState() { return datum.state.toString(); }
   /** Get the counters for the attempt */
-  public Counters getCounters() { return counters; }
+  Counters getCounters() { return EventReader.fromAvro(datum.counters); }
   /** Get the event type */
-  public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_FINISHED;
+  public Events.EventType getEventType() {
+    return Events.EventType.MAP_ATTEMPT_FINISHED;
   }
 
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected token while reading");
-    }
-
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldname = jp.getCurrentName();
-      jp.nextToken(); // move to value
-      switch (Enum.valueOf(EventFields.class, fieldname)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case TASK_ID:
-        taskid = TaskID.forName(jp.getText());
-        break;
-      case TASK_ATTEMPT_ID: 
-        attemptId = TaskAttemptID.forName(jp.getText());
-        break;
-      case TASK_TYPE:
-        taskType = TaskType.valueOf(jp.getText());
-        break;
-      case TASK_STATUS:
-        taskStatus = jp.getText();
-        break;
-      case FINISH_TIME:
-        finishTime = jp.getLongValue();
-        break;
-      case HOSTNAME:
-        hostname = jp.getText();
-        break;
-      case STATE:
-        state = jp.getText();
-        break;
-      case COUNTERS:
-        counters = EventReader.readCounters(jp);
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldname+"'!");
-      }
-    }
-  }
-
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString());
-    gen.writeStringField(EventFields.TASK_ATTEMPT_ID.toString(),
-        attemptId.toString());
-    gen.writeStringField(EventFields.TASK_TYPE.toString(), 
-        taskType.toString());
-    gen.writeStringField(EventFields.TASK_STATUS.toString(),
-        taskStatus);
-    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
-    gen.writeStringField(EventFields.HOSTNAME.toString(), hostname);
-    gen.writeStringField(EventFields.STATE.toString(), state);
-    EventWriter.writeCounters(counters, gen);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
Fri Sep 18 22:22:37 2009
@@ -23,31 +23,15 @@
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record start of a task attempt
  *
  */
 public class TaskAttemptStartedEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private TaskID taskid;
-  private TaskType taskType;
-  private TaskAttemptID attemptId;
-  private  long startTime;
-  private  String trackerName;
-  private int httpPort;
-
-  enum EventFields { EVENT_CATEGORY,
-                     TASK_ID,
-                     TASK_TYPE,
-                     TASK_ATTEMPT_ID,
-                     START_TIME,
-                     TRACKER_NAME,
-                     HTTP_PORT }
+  private Events.TaskAttemptStarted datum = new Events.TaskAttemptStarted();
 
   /**
    * Create an event to record the start of an attempt
@@ -60,85 +44,40 @@
   public TaskAttemptStartedEvent( TaskAttemptID attemptId,  
       TaskType taskType, long startTime, String trackerName,
       int httpPort) {
-    this.attemptId = attemptId;
-    this.taskid = attemptId.getTaskID();
-    this.startTime = startTime;
-    this.taskType = taskType;
-    this.trackerName = trackerName;
-    this.httpPort = httpPort;
-    this.category = EventCategory.TASK_ATTEMPT;
+    datum.attemptId = new Utf8(attemptId.toString());
+    datum.taskid = new Utf8(attemptId.getTaskID().toString());
+    datum.startTime = startTime;
+    datum.taskType = new Utf8(taskType.name());
+    datum.trackerName = new Utf8(trackerName);
+    datum.httpPort = httpPort;
   }
 
-  TaskAttemptStartedEvent() {
+  TaskAttemptStartedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.TaskAttemptStarted)datum;
   }
 
   /** Get the task id */
-  public TaskID getTaskId() { return taskid; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the tracker name */
-  public String getTrackerName() { return trackerName; }
+  public String getTrackerName() { return datum.trackerName.toString(); }
   /** Get the start time */
-  public long getStartTime() { return startTime; }
+  public long getStartTime() { return datum.startTime; }
   /** Get the task type */
-  public TaskType getTaskType() { return taskType; }
+  public TaskType getTaskType() {
+    return TaskType.valueOf(datum.taskType.toString());
+  }
   /** Get the HTTP port */
-  public int getHttpPort() { return httpPort; }
+  public int getHttpPort() { return datum.httpPort; }
   /** Get the attempt id */
-  public TaskAttemptID getTaskAttemptId() { return attemptId; }
-  /** Get the event type */
-  public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_STARTED;
+  public TaskAttemptID getTaskAttemptId() {
+    return TaskAttemptID.forName(datum.attemptId.toString());
   }
-
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case TASK_ID:
-        taskid = TaskID.forName(jp.getText());
-        break;
-      case TASK_ATTEMPT_ID: 
-        attemptId = TaskAttemptID.forName(jp.getText());
-        break;
-      case TASK_TYPE:
-        taskType = TaskType.valueOf(jp.getText());
-        break;
-      case START_TIME:
-        startTime = jp.getLongValue();
-        break;
-      case TRACKER_NAME:
-        trackerName = jp.getText();
-        break;
-      case HTTP_PORT:
-        httpPort = jp.getIntValue();
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
+  /** Get the event type */
+  public Events.EventType getEventType() {
+    return Events.EventType.MAP_ATTEMPT_STARTED;
   }
 
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString());
-    gen.writeStringField(EventFields.TASK_ATTEMPT_ID.toString(),
-        attemptId.toString());
-    gen.writeStringField(EventFields.TASK_TYPE.toString(),
-        taskType.toString());
-    gen.writeNumberField(EventFields.START_TIME.toString(), startTime);
-    gen.writeStringField(EventFields.TRACKER_NAME.toString(), trackerName);
-    gen.writeNumberField(EventFields.HTTP_PORT.toString(), httpPort);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
Fri Sep 18 22:22:37 2009
@@ -23,33 +23,16 @@
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record unsuccessful (Killed/Failed) completion of task attempts
  *
  */
 public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private TaskID taskid;
-  private TaskType taskType;
-  private TaskAttemptID attemptId;
-  private  long finishTime;
-  private String hostname;
-  private String status;
-  private  String error;
-
-  enum EventFields { EVENT_CATEGORY,
-    TASK_ID,
-    TASK_TYPE,
-    TASK_ATTEMPT_ID,
-    FINISH_TIME,
-    HOSTNAME,
-    STATUS,
-    ERROR }
+  private Events.TaskAttemptUnsuccessfulCompletion datum =
+    new Events.TaskAttemptUnsuccessfulCompletion();
 
   /** 
    * Create an event to record the unsuccessful completion of attempts
@@ -64,92 +47,43 @@
       TaskType taskType,
       String status, long finishTime, 
       String hostname, String error) {
-    this.taskid = id.getTaskID();
-    this.taskType = taskType;
-    this.attemptId = id;
-    this.finishTime = finishTime;
-    this.hostname = hostname;
-    this.error = error;
-    this.status = status;
-    this.category = EventCategory.TASK_ATTEMPT;
+    datum.taskid = new Utf8(id.getTaskID().toString());
+    datum.taskType = new Utf8(taskType.name());
+    datum.attemptId = new Utf8(id.toString());
+    datum.finishTime = finishTime;
+    datum.hostname = new Utf8(hostname);
+    datum.error = new Utf8(error);
+    datum.status = new Utf8(status);
   }
 
-  TaskAttemptUnsuccessfulCompletionEvent() {
+  TaskAttemptUnsuccessfulCompletionEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.TaskAttemptUnsuccessfulCompletion)datum;
   }
 
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
   /** Get the task id */
-  public TaskID getTaskId() { return taskid; }
+  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the task type */
-  public TaskType getTaskType() { return taskType; }
+  public TaskType getTaskType() {
+    return TaskType.valueOf(datum.taskType.toString());
+  }
   /** Get the attempt id */
-  public TaskAttemptID getTaskAttemptId() { return attemptId; }
+  public TaskAttemptID getTaskAttemptId() {
+    return TaskAttemptID.forName(datum.attemptId.toString());
+  }
   /** Get the finish time */
-  public long getFinishTime() { return finishTime; }
+  public long getFinishTime() { return datum.finishTime; }
   /** Get the name of the host where the attempt executed */
-  public String getHostname() { return hostname; }
+  public String getHostname() { return datum.hostname.toString(); }
   /** Get the error string */
-  public String getError() { return error; }
+  public String getError() { return datum.error.toString(); }
   /** Get the task status */
-  public String getTaskStatus() { return status; }
+  public String getTaskStatus() { return datum.status.toString(); }
   /** Get the event type */
-  public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_KILLED;
-  }
-
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case TASK_ID:
-        taskid = TaskID.forName(jp.getText());
-        break;
-      case TASK_TYPE: 
-        taskType = TaskType.valueOf(jp.getText());
-        break;
-      case TASK_ATTEMPT_ID: 
-        attemptId = TaskAttemptID.forName(jp.getText());
-        break;
-      case FINISH_TIME:
-        finishTime = jp.getLongValue();
-        break;
-      case HOSTNAME:
-        hostname = jp.getText();
-        break;
-      case ERROR:
-        error = jp.getText();
-        break;
-      case STATUS:
-        status = jp.getText();
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
+  public Events.EventType getEventType() {
+    return Events.EventType.MAP_ATTEMPT_KILLED;
   }
 
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString());
-    gen.writeStringField(EventFields.TASK_TYPE.toString(),
-        taskType.toString());
-    gen.writeStringField(EventFields.TASK_ATTEMPT_ID.toString(),
-        attemptId.toString());
-    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
-    gen.writeStringField(EventFields.HOSTNAME.toString(), hostname);
-    gen.writeStringField(EventFields.ERROR.toString(), error);
-    gen.writeStringField(EventFields.STATUS.toString(), status);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
Fri Sep 18 22:22:37 2009
@@ -23,31 +23,15 @@
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record the failure of a task
  *
  */
 public class TaskFailedEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private TaskID taskid;
-  private TaskType taskType;
-  private  long finishTime;
-  private  String error;
-  private TaskAttemptID failedDueToAttempt;
-  private String status;
-
-  enum EventFields { EVENT_CATEGORY,
-                     TASK_ID,
-                     TASK_TYPE,
-                     FINISH_TIME,
-                     ERROR,
-                     STATUS,
-                     FAILED_ATTEMPT_ID }
+  private Events.TaskFailed datum = new Events.TaskFailed();
 
   /**
    * Create an event to record task failure
@@ -61,86 +45,41 @@
   public TaskFailedEvent(TaskID id, long finishTime, 
       TaskType taskType, String error, String status,
       TaskAttemptID failedDueToAttempt) {
-    this.taskid = id;
-    this.error = error;
-    this.finishTime = finishTime;
-    this.taskType = taskType;
-    this.failedDueToAttempt = failedDueToAttempt;
-    this.category = EventCategory.TASK;
-    this.status = status;
+    datum.taskid = new Utf8(id.toString());
+    datum.error = new Utf8(error);
+    datum.finishTime = finishTime;
+    datum.taskType = new Utf8(taskType.name());
+    datum.failedDueToAttempt = failedDueToAttempt == null
+      ? null
+      : new Utf8(failedDueToAttempt.toString());
+    datum.status = new Utf8(status);
   }
 
-  TaskFailedEvent() {
-  }
+  TaskFailedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) { this.datum = (Events.TaskFailed)datum; }
 
   /** Get the task id */
-  public TaskID getTaskId() { return taskid; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the error string */
-  public String getError() { return error; }
+  public String getError() { return datum.error.toString(); }
   /** Get the finish time of the attempt */
-  public long getFinishTime() { return finishTime; }
+  public long getFinishTime() { return datum.finishTime; }
   /** Get the task type */
-  public TaskType getTaskType() { return taskType; }
+  public TaskType getTaskType() {
+    return TaskType.valueOf(datum.taskType.toString());
+  }
   /** Get the attempt id due to which the task failed */
-  public TaskAttemptID getFailedAttemptID() { return failedDueToAttempt; }
+  public TaskAttemptID getFailedAttemptID() {
+    return datum.failedDueToAttempt == null
+      ? null
+      : TaskAttemptID.forName(datum.failedDueToAttempt.toString());
+  }
   /** Get the task status */
-  public String getTaskStatus() { return status; }
+  public String getTaskStatus() { return datum.status.toString(); }
   /** Get the event type */
-  public EventType getEventType() { return EventType.TASK_FAILED; }
+  public Events.EventType getEventType() { return Events.EventType.TASK_FAILED; }
 
   
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case TASK_ID:
-        taskid = TaskID.forName(jp.getText());
-        break;
-      case TASK_TYPE:
-        taskType = TaskType.valueOf(jp.getText());
-        break;
-      case FINISH_TIME:
-        finishTime = jp.getLongValue();
-        break;
-      case ERROR:
-        error = jp.getText();
-        break;
-      case STATUS:
-        status = jp.getText();
-        break;
-      case FAILED_ATTEMPT_ID:
-        failedDueToAttempt = TaskAttemptID.forName(jp.getText());
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
-  }
-
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString());
-    gen.writeStringField(EventFields.TASK_TYPE.toString(),
-        taskType.toString());
-    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
-    gen.writeStringField(EventFields.ERROR.toString(), error);
-    gen.writeStringField(EventFields.STATUS.toString(), status);
-    if (failedDueToAttempt != null) {
-      gen.writeStringField(EventFields.FAILED_ATTEMPT_ID.toString(),
-          failedDueToAttempt.toString());
-    }
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
Fri Sep 18 22:22:37 2009
@@ -23,33 +23,16 @@
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record the successful completion of a task
  *
  */
 public class TaskFinishedEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private TaskID taskid;
-  private TaskType taskType;
-  private long finishTime;
-  private String status;
-  private Counters counters;
+  private Events.TaskFinished datum = new Events.TaskFinished();
   
-  enum EventFields { EVENT_CATEGORY,
-                     TASK_ID,
-                     TASK_TYPE,
-                     FINISH_TIME,
-                     STATUS,
-                     COUNTERS }
-    
-  TaskFinishedEvent() {
-  }
-
   /**
    * Create an event to record the successful completion of a task
    * @param id Task ID
@@ -61,75 +44,36 @@
   public TaskFinishedEvent(TaskID id, long finishTime,
                            TaskType taskType,
                            String status, Counters counters) {
-    this.taskid = id;
-    this.finishTime = finishTime;
-    this.counters = counters;
-    this.taskType = taskType;
-    this.status = status;
-    this.category = EventCategory.TASK;
+    datum.taskid = new Utf8(id.toString());
+    datum.finishTime = finishTime;
+    datum.counters = EventWriter.toAvro(counters);
+    datum.taskType = new Utf8(taskType.name());
+    datum.status = new Utf8(status);
   }
   
+  TaskFinishedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.TaskFinished)datum;
+  }
+
   /** Get task id */
-  public TaskID getTaskId() { return taskid; }
+  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the task finish time */
-  public long getFinishTime() { return finishTime; }
+  public long getFinishTime() { return datum.finishTime; }
   /** Get task counters */
-  public Counters getCounters() { return counters; }
+  Counters getCounters() { return EventReader.fromAvro(datum.counters); }
   /** Get task type */
-  public TaskType getTaskType() { return taskType; }
+  public TaskType getTaskType() {
+    return TaskType.valueOf(datum.taskType.toString());
+  }
   /** Get task status */
-  public String getTaskStatus() { return status; }
+  public String getTaskStatus() { return datum.status.toString(); }
   /** Get event type */
-  public EventType getEventType() {
-    return EventType.TASK_FINISHED;
-  }
-  /** Get Event Category */
-  public EventCategory getEventCategory() { return category; }
-  
-  
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected token while reading");
-    }
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldname = jp.getCurrentName();
-      jp.nextToken(); // move to value
-      switch (Enum.valueOf(EventFields.class, fieldname)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case TASK_ID:
-        taskid = TaskID.forName(jp.getText());
-        break;
-      case TASK_TYPE:
-        taskType = TaskType.valueOf(jp.getText());
-        break;
-      case FINISH_TIME:
-        finishTime = jp.getLongValue();
-        break;
-      case STATUS:
-        status = jp.getText();
-        break;
-      case COUNTERS:
-        counters = EventReader.readCounters(jp);
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldname+"'!");
-      }
-    }
+  public Events.EventType getEventType() {
+    return Events.EventType.TASK_FINISHED;
   }
 
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString());
-    gen.writeStringField(EventFields.TASK_TYPE.toString(),
-        taskType.toString());
-    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
-    gen.writeStringField(EventFields.STATUS.toString(), status);
-    EventWriter.writeCounters(counters, gen);
-    gen.writeEndObject();
-  }
+  
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
Fri Sep 18 22:22:37 2009
@@ -22,27 +22,15 @@
 
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record the start of a task
  *
  */
 public class TaskStartedEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private TaskID taskid;
-  private TaskType taskType;
-  private  long startTime;
-  private  String splitLocations;
-
-  enum EventFields { EVENT_CATEGORY,
-                     TASK_ID,
-                     TASK_TYPE,
-                     START_TIME,
-                     SPLIT_LOCATIONS }
+  private Events.TaskStarted datum = new Events.TaskStarted();
 
   /**
    * Create an event to record start of a task
@@ -53,71 +41,30 @@
    */
   public TaskStartedEvent(TaskID id, long startTime, 
       TaskType taskType, String splitLocations) {
-    this.taskid = id;
-    this.splitLocations = splitLocations;
-    this.startTime = startTime;
-    this.taskType = taskType;
-    this.category = EventCategory.TASK;
+    datum.taskid = new Utf8(id.toString());
+    datum.splitLocations = new Utf8(splitLocations);
+    datum.startTime = startTime;
+    datum.taskType = new Utf8(taskType.name());
   }
 
-  TaskStartedEvent() {
-  }
+  TaskStartedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) { this.datum = (Events.TaskStarted)datum; }
 
   /** Get the task id */
-  public TaskID getTaskId() { return taskid; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the split locations, applicable for map tasks */
-  public String getSplitLocations() { return splitLocations; }
+  public String getSplitLocations() { return datum.splitLocations.toString(); }
   /** Get the start time of the task */
-  public long getStartTime() { return startTime; }
+  public long getStartTime() { return datum.startTime; }
   /** Get the task type */
-  public TaskType getTaskType() { return taskType; }
-  /** Get the event type */
-  public EventType getEventType() {
-    return EventType.TASK_STARTED;
+  public TaskType getTaskType() {
+    return TaskType.valueOf(datum.taskType.toString());
   }
-
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case TASK_ID:
-        taskid = TaskID.forName(jp.getText());
-        break;
-      case TASK_TYPE:
-        taskType = TaskType.valueOf(jp.getText());
-        break;
-      case START_TIME:
-        startTime = jp.getLongValue();
-        break;
-      case SPLIT_LOCATIONS:
-        splitLocations = jp.getText();
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
+  /** Get the event type */
+  public Events.EventType getEventType() {
+    return Events.EventType.TASK_STARTED;
   }
 
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString());
-    gen.writeStringField(EventFields.TASK_TYPE.toString(),
-        taskType.toString());
-    gen.writeNumberField(EventFields.START_TIME.toString(), startTime);
-    gen.writeStringField(EventFields.SPLIT_LOCATIONS.toString(),
-        splitLocations);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java
Fri Sep 18 22:22:37 2009
@@ -21,23 +21,15 @@
 import java.io.IOException;
 
 import org.apache.hadoop.mapreduce.TaskID;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record updates to a task
  *
  */
 public class TaskUpdatedEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private TaskID taskid;
-  private  long finishTime;
-
-  enum EventFields { EVENT_CATEGORY,
-                     TASK_ID,
-                     FINISH_TIME }
+  private Events.TaskUpdated datum = new Events.TaskUpdated();
 
   /**
    * Create an event to record task updates
@@ -45,54 +37,22 @@
    * @param finishTime Finish time of the task
    */
   public TaskUpdatedEvent(TaskID id, long finishTime) {
-    this.taskid = id;
-    this.finishTime = finishTime;
-    this.category = EventCategory.TASK;
+    datum.taskid = new Utf8(id.toString());
+    datum.finishTime = finishTime;
   }
 
-  TaskUpdatedEvent() {
-  }
+  TaskUpdatedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) { this.datum = (Events.TaskUpdated)datum; }
+
   /** Get the task ID */
-  public TaskID getTaskId() { return taskid; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the task finish time */
-  public long getFinishTime() { return finishTime; }
+  public long getFinishTime() { return datum.finishTime; }
   /** Get the event type */
-  public EventType getEventType() {
-    return EventType.TASK_UPDATED;
-  }
-
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case TASK_ID:
-        taskid = TaskID.forName(jp.getText());
-        break;
-      case FINISH_TIME:
-        finishTime = jp.getLongValue();
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
+  public Events.EventType getEventType() {
+    return Events.EventType.TASK_UPDATED;
   }
 
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString());
-    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
-    gen.writeEndObject();
-  }
 }



Mime
View raw message