hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r816801 [1/2] - in /hadoop/mapreduce/trunk: ./ ivy/ src/contrib/capacity-scheduler/ src/contrib/fairscheduler/ src/contrib/gridmix/ src/contrib/mrunit/ src/contrib/sqoop/ src/contrib/streaming/ src/java/org/apache/hadoop/mapreduce/ src/java...
Date Fri, 18 Sep 2009 22:22:39 GMT
Author: cutting
Date: Fri Sep 18 22:22:37 2009
New Revision: 816801

URL: http://svn.apache.org/viewvc?rev=816801&view=rev
Log:
MAPREDUCE-980.  Modify JobHistory to use Avro for serialization.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
Removed:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventType.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/build.xml
    hadoop/mapreduce/trunk/ivy.xml
    hadoop/mapreduce/trunk/ivy/libraries.properties
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/mrunit/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
    hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/CounterGroup.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counters.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/TaskUpdatedEvent.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Sep 18 22:22:37 2009
@@ -121,6 +121,8 @@
     MAPREDUCE-679. XML-based metrics as JSP servlet for JobTracker.
     (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-980. Modify JobHistory to use Avro for serialization. (cutting)
+
   IMPROVEMENTS
 
     MAPREDUCE-816. Rename "local" mysql import to "direct" in Sqoop.

Modified: hadoop/mapreduce/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/build.xml?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/build.xml (original)
+++ hadoop/mapreduce/trunk/build.xml Fri Sep 18 22:22:37 2009
@@ -305,7 +305,19 @@
 
   </target>
 
-  <target name="compile-mapred-classes" depends="init">
+  <target name="avro-generate" depends="init">
+    <mkdir dir="${build.src}/org/apache/hadoop/mapreduce/jobhistory"/>
+    <taskdef name="protocol" classname="org.apache.avro.specific.ProtocolTask">
+      <classpath refid="classpath" />
+    </taskdef>
+    <protocol destdir="${build.src}">
+      <fileset dir="${mapred.src.dir}">
+	<include name="**/*.avpr" />
+      </fileset>
+    </protocol>
+  </target>
+
+  <target name="compile-mapred-classes" depends="init,avro-generate">
     <taskdef classname="org.apache.jasper.JspC" name="jsp-compile" >
        <classpath refid="classpath"/>
     </taskdef>

Modified: hadoop/mapreduce/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/ivy.xml?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/ivy.xml (original)
+++ hadoop/mapreduce/trunk/ivy.xml Fri Sep 18 22:22:37 2009
@@ -271,7 +271,7 @@
     </dependency>
     <dependency org="org.apache.hadoop"
       name="avro"
-      rev="1.0.0"
+      rev="${avro.version}"
       conf="common->default"/>
     <dependency org="org.codehaus.jackson"
       name="jackson-mapper-asl"
@@ -283,7 +283,7 @@
       conf="common->default"/>
     <dependency org="com.thoughtworks.paranamer"
       name="paranamer"
-      rev="1.5"
+      rev="${paranamer.version}"
       conf="common->default"/>
     </dependencies>
   

Modified: hadoop/mapreduce/trunk/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/ivy/libraries.properties?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/ivy/libraries.properties (original)
+++ hadoop/mapreduce/trunk/ivy/libraries.properties Fri Sep 18 22:22:37 2009
@@ -16,6 +16,8 @@
 #These are the versions of our dependencies (in alphabetical order)
 apacheant.version=1.7.0
 
+avro.version=1.1.0
+
 checkstyle.version=4.2
 
 commons-cli.version=1.2
@@ -41,6 +43,8 @@
 #ivy.version=2.0.0-beta2
 ivy.version=2.0.0-rc2
 
+jackson.version=1.0.1
+
 jasper.version=5.5.12
 jsp.version=2.1
 jsp-api.version=5.5.12
@@ -59,6 +63,8 @@
 
 oro.version=2.0.8
 
+paranamer.version=1.5
+
 rats-lib.version=0.6
 
 servlet.version=4.0.6
@@ -69,5 +75,3 @@
 
 xmlenc.version=0.52
 xerces.version=1.4.4
-
-jackson.version=1.0.1

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/ivy.xml Fri Sep 18 22:22:37 2009
@@ -66,7 +66,7 @@
       conf="common->master"/> 
     <dependency org="org.apache.hadoop"
       name="avro"
-      rev="1.0.0"
+      rev="${avro.version}"
       conf="common->default"/>
     <dependency org="org.codehaus.jackson"
       name="jackson-mapper-asl"
@@ -76,5 +76,9 @@
       name="jackson-core-asl"
       rev="${jackson.version}"
       conf="common->default"/>
+    <dependency org="com.thoughtworks.paranamer"
+      name="paranamer"
+      rev="${paranamer.version}"
+      conf="common->default"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/ivy.xml Fri Sep 18 22:22:37 2009
@@ -38,13 +38,21 @@
       name="junit"
       rev="${junit.version}"
       conf="common->default"/>
+    <dependency org="org.apache.hadoop"
+      name="avro"
+      rev="${avro.version}"
+      conf="common->default"/>
     <dependency org="org.codehaus.jackson"
       name="jackson-mapper-asl"
       rev="${jackson.version}"
       conf="common->default"/>
-    <dependency org="org.codehaus.jackson"
-      name="jackson-core-asl"
-      rev="${jackson.version}"
+    <dependency org="com.thoughtworks.paranamer"
+      name="paranamer"
+      rev="${paranamer.version}"
+      conf="common->default"/>
+    <dependency org="com.thoughtworks.paranamer"
+      name="paranamer-ant"
+      rev="${paranamer.version}"
       conf="common->default"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/gridmix/ivy.xml Fri Sep 18 22:22:37 2009
@@ -87,7 +87,7 @@
       conf="common->default"/>
     <dependency org="org.apache.hadoop"
       name="avro"
-      rev="1.0.0"
+      rev="${avro.version}"
       conf="common->default"/>
     <dependency org="org.codehaus.jackson"
       name="jackson-mapper-asl"
@@ -97,5 +97,9 @@
       name="jackson-core-asl"
       rev="${jackson.version}"
       conf="common->default"/>
+    <dependency org="com.thoughtworks.paranamer"
+      name="paranamer"
+      rev="${paranamer.version}"
+      conf="common->default"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/trunk/src/contrib/mrunit/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mrunit/ivy.xml?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mrunit/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/mrunit/ivy.xml Fri Sep 18 22:22:37 2009
@@ -54,7 +54,7 @@
       conf="common->master"/>
    <dependency org="org.apache.hadoop"
       name="avro"
-      rev="1.0.0"
+      rev="${avro.version}"
       conf="common->default"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/sqoop/ivy.xml Fri Sep 18 22:22:37 2009
@@ -78,7 +78,7 @@
       conf="common->master"/>
     <dependency org="org.apache.hadoop"
       name="avro"
-      rev="1.0.0"
+      rev="${avro.version}"
       conf="common->default"/>
     <dependency org="javax.servlet"
       name="servlet-api"
@@ -100,5 +100,9 @@
       name="jackson-core-asl"
       rev="${jackson.version}"
       conf="common->default"/>
+    <dependency org="com.thoughtworks.paranamer"
+      name="paranamer"
+      rev="${paranamer.version}"
+      conf="common->default"/>
     </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml (original)
+++ hadoop/mapreduce/trunk/src/contrib/streaming/ivy.xml Fri Sep 18 22:22:37 2009
@@ -66,7 +66,7 @@
       conf="common->master"/>
     <dependency org="org.apache.hadoop"
       name="avro"
-      rev="1.0.0"
+      rev="${avro.version}"
       conf="common->default"/>
     <dependency org="org.codehaus.jackson"
       name="jackson-mapper-asl"
@@ -76,5 +76,9 @@
       name="jackson-core-asl"
       rev="${jackson.version}"
       conf="common->default"/>
+    <dependency org="com.thoughtworks.paranamer"
+      name="paranamer"
+      rev="${paranamer.version}"
+      conf="common->default"/>
     </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counter.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counter.java Fri Sep 18 22:22:37 2009
@@ -50,6 +50,17 @@
     this.displayName = displayName;
   }
   
+  /** Create a counter.
+   * @param name the name within the group's enum.
+   * @param displayName a name to be displayed.
+   * @param value the counter value.
+   */
+  public Counter(String name, String displayName, long value) {
+    this.name = name;
+    this.displayName = displayName;
+    this.value = value;
+  }
+  
   @Deprecated
   protected synchronized void setDisplayName(String displayName) {
     this.displayName = displayName;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/CounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/CounterGroup.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/CounterGroup.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/CounterGroup.java Fri Sep 18 22:22:37 2009
@@ -60,7 +60,11 @@
     displayName = localize("CounterGroupName", name);
   }
   
-  protected CounterGroup(String name, String displayName) {
+  /** Create a CounterGroup.
+   * @param name the name of the group's enum.
+   * @param displayName a name to be displayed for the group.
+   */
+  public CounterGroup(String name, String displayName) {
     this.name = name;
     this.displayName = displayName;
   }
@@ -81,7 +85,8 @@
     return displayName;
   }
 
-  synchronized void addCounter(Counter counter) {
+  /** Add a counter to this group. */
+  public synchronized void addCounter(Counter counter) {
     counters.put(counter.getName(), counter);
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counters.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counters.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Counters.java Fri Sep 18 22:22:37 2009
@@ -41,6 +41,11 @@
     }
   }
 
+  /** Add a group. */
+  public void addGroup(CounterGroup group) {
+    groups.put(group.getName(), group);
+  }
+
   public Counter findCounter(String groupName, String counterName) {
     CounterGroup grp = getGroup(groupName);
     return grp.findCounter(counterName);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Fri Sep 18 22:22:37 2009
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.jobhistory;
 
 import java.io.IOException;
+import java.io.EOFException;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -26,21 +27,19 @@
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
 
-public class EventReader {
-
-  static final JsonFactory FACTORY = new JsonFactory();
+import org.apache.avro.Schema;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
 
-  enum GroupFields { ID, NAME, LIST }
-  enum CounterFields { ID, NAME, VALUE }
-
-  private final JsonParser parser;
+public class EventReader {
+  private String version;
+  private Schema schema;
   private FSDataInputStream in;
-  
-  private String version = null;
+  private Decoder decoder;
+  private DatumReader reader;
 
   /**
    * Create a new Event Reader
@@ -57,58 +56,97 @@
    * @param in
    * @throws IOException
    */
+  @SuppressWarnings("deprecation")
   public EventReader(FSDataInputStream in) throws IOException {
     this.in = in;
-    parser = FACTORY.createJsonParser(in);
-    readVersionInfo();
-  }
-
-  private void readVersionInfo() throws IOException {
-    if (parser.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-    
-    parser.nextToken(); // Key
-    parser.nextToken(); // Value
+    this.version = in.readLine();
     
-    this.version = parser.getText();
+    if (!EventWriter.VERSION.equals(version))
+      throw new IOException("Incompatible event log version: "+version);
     
-    parser.nextToken(); // Consume the End Object
+    this.schema = Schema.parse(in.readLine());
+    this.reader =
+      new SpecificDatumReader(schema,
+                              "org.apache.hadoop.mapreduce.jobhistory.Events$");
+    this.decoder = new BinaryDecoder(in);
   }
   
   /**
-   * Return the current history version
-   */
-  public String getHistoryVersion() { return version; }
-  
-  /**
    * Get the next event from the stream
    * @return the next event
    * @throws IOException
    */
+  @SuppressWarnings("unchecked")
   public HistoryEvent getNextEvent() throws IOException {
-    EventType type = getHistoryEventType();
-
-    if (type == null) {
+    Events.Event wrapper;
+    try {
+      wrapper = (Events.Event)reader.read(null, decoder);
+    } catch (EOFException e) {
       return null;
     }
-
-    Class<? extends HistoryEvent> clazz = type.getKlass();
-
-    if (clazz == null) {
-      throw new IOException("CLass not known for " + type);
-    }
-
-    HistoryEvent ev = null;
-    try {
-      ev = clazz.newInstance();
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new IOException("Error Instantiating new object");
+    HistoryEvent result;
+    switch (wrapper.type) {
+    case JOB_SUBMITTED:
+      result = new JobSubmittedEvent(); break;
+    case JOB_INITED:
+      result = new JobInitedEvent(); break;
+    case JOB_FINISHED:
+      result = new JobFinishedEvent(); break;
+    case JOB_PRIORITY_CHANGED:
+      result = new JobPriorityChangeEvent(); break;
+    case JOB_STATUS_CHANGED:
+      result = new JobStatusChangedEvent(); break;
+    case JOB_FAILED:
+      result = new JobUnsuccessfulCompletionEvent(); break;
+    case JOB_KILLED:
+      result = new JobUnsuccessfulCompletionEvent(); break;
+    case JOB_INFO_CHANGED:
+      result = new JobInfoChangeEvent(); break;
+    case TASK_STARTED:
+      result = new TaskStartedEvent(); break;
+    case TASK_FINISHED:
+      result = new TaskFinishedEvent(); break;
+    case TASK_FAILED:
+      result = new TaskFailedEvent(); break;
+    case TASK_UPDATED:
+      result = new TaskUpdatedEvent(); break;
+    case MAP_ATTEMPT_STARTED:
+      result = new TaskAttemptStartedEvent(); break;
+    case MAP_ATTEMPT_FINISHED:
+      result = new MapAttemptFinishedEvent(); break;
+    case MAP_ATTEMPT_FAILED:
+      result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
+    case MAP_ATTEMPT_KILLED:
+      result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
+    case REDUCE_ATTEMPT_STARTED:
+      result = new TaskAttemptStartedEvent(); break;
+    case REDUCE_ATTEMPT_FINISHED:
+      result = new ReduceAttemptFinishedEvent(); break;
+    case REDUCE_ATTEMPT_FAILED:
+      result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
+    case REDUCE_ATTEMPT_KILLED:
+      result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
+    case SETUP_ATTEMPT_STARTED:
+      result = new TaskAttemptStartedEvent(); break;
+    case SETUP_ATTEMPT_FINISHED:
+      result = new TaskAttemptFinishedEvent(); break;
+    case SETUP_ATTEMPT_FAILED:
+      result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
+    case SETUP_ATTEMPT_KILLED:
+      result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
+    case CLEANUP_ATTEMPT_STARTED:
+      result = new TaskAttemptStartedEvent(); break;
+    case CLEANUP_ATTEMPT_FINISHED:
+      result = new TaskAttemptFinishedEvent(); break;
+    case CLEANUP_ATTEMPT_FAILED:
+      result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
+    case CLEANUP_ATTEMPT_KILLED:
+      result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
+    default:
+      throw new RuntimeException("unexpected event type!");
     }
-
-    ev.readFields(parser);
-    return ev;
+    result.setDatum(wrapper.event);
+    return result;
   }
 
   /**
@@ -122,97 +160,19 @@
     in = null;
   }
 
- 
-  /**
-   * Read the next JSON Object  to identify the event type.
-   * @param jp
-   * @return EventType
-   * @throws IOException
-   */
-  private EventType getHistoryEventType()
-  throws IOException {
-
-    if (parser.nextToken() == null) { // Verify the Start Object
-      return null; 
-    }
-
-    parser.nextToken();// Get the Event type
-
-    String fieldname = parser.getCurrentName();
-
-    if (!"EVENT_TYPE".equals(fieldname)) {
-      throw new IOException("Unexpected event type: " + fieldname);
-    }
-
-    parser.nextToken(); // Go to the value
-    String type = parser.getText();
-
-    parser.nextToken(); // Consume the end object
-
-    return Enum.valueOf(EventType.class, type);
-  }
-
-
-  static Counters readCounters(JsonParser jp) throws IOException {
-    Counters counters = new Counters();
-    while (jp.nextToken() !=JsonToken.END_ARRAY) {
-      readOneGroup(counters, jp);
-    }
-    return counters;
-  }
-
-  static void readOneGroup(Counters counters, JsonParser jp)
-  throws IOException {
-
-    jp.nextToken(); 
-
-    String fieldname = jp.getCurrentName();
-
-    if (!Enum.valueOf(GroupFields.class, fieldname).equals(GroupFields.ID)) {
-      throw new IOException("Internal error");
-    }
-    
-    jp.nextToken(); // Get the value
-    
-    CounterGroup grp = counters.getGroup(jp.getText());
-
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      fieldname = jp.getCurrentName();
-      jp.nextToken(); // move to value
-      switch(Enum.valueOf(GroupFields.class, fieldname)) {
-      case NAME: 
-        break;
-      case LIST: 
-        while (jp.nextToken() != JsonToken.END_ARRAY) {
-          readOneCounter(grp, jp);
-        }
-        break;
-      default:
-        throw new IOException("Unrecognized field '" + fieldname + "'!");
-      }
-    }    
-  }
-
-  static void readOneCounter(CounterGroup grp, JsonParser jp)
-  throws IOException {
-    String name = null;
-    String displayName = null;
-    long value = 0;
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldname = jp.getCurrentName();
-      jp.nextToken();
-      switch (Enum.valueOf(CounterFields.class, fieldname)) {
-      case ID: name = jp.getText(); break;
-      case NAME: displayName = jp.getText(); break;
-      case VALUE: value = jp.getLongValue(); break;
-      default:
-        throw new IOException("Unrecognized field '"+ fieldname + "'!");
+  static Counters fromAvro(Events.Counters counters) {
+    Counters result = new Counters();
+    for (Events.CounterGroup g : counters.groups) {
+      CounterGroup group =
+        new CounterGroup(g.name.toString(), g.displayName.toString());
+      for (Events.Counter c : g.counts) {
+        group.addCounter(new Counter(c.name.toString(),
+                                     c.displayName.toString(),
+                                     c.value));
       }
+      result.addGroup(group);
     }
-    
-    Counter ctr = grp.findCounter(name, displayName);
-    ctr.increment(value);
+    return result;
   }
 
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Fri Sep 18 22:22:37 2009
@@ -25,11 +25,14 @@
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.CounterGroup;
 import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.jobhistory.EventReader.CounterFields;
-import org.apache.hadoop.mapreduce.jobhistory.EventReader.GroupFields;
-import org.codehaus.jackson.JsonEncoding;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.io.Encoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
 
 /**
  * Event Writer is an utility class used to write events to the underlying
@@ -38,91 +41,67 @@
  * 
  */
 class EventWriter {
+  static final String VERSION = "Avro-Binary";
 
-  static final JsonFactory FACTORY = new JsonFactory();
-  private JsonGenerator gen; 
+  private FSDataOutputStream out;
+  private DatumWriter<Object> writer =
+    new SpecificDatumWriter(Events.Event._SCHEMA);
+  private Encoder encoder;
   
   EventWriter(FSDataOutputStream out) throws IOException {
-    gen = FACTORY.createJsonGenerator(out, JsonEncoding.UTF8);
-    // Prefix all log files with the version
-    writeVersionInfo();
-  }
-  
-  private void writeVersionInfo() throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField("HISTORY_VERSION", JobHistory.HISTORY_VERSION);
-    gen.writeEndObject();
-    gen.writeRaw("\n");
+    this.out = out;
+    out.writeBytes(VERSION);
+    out.writeBytes("\n");
+    out.writeBytes(Events.Event._SCHEMA.toString());
+    out.writeBytes("\n");
+    this.encoder = new BinaryEncoder(out);
   }
   
-  synchronized void write(HistoryEvent event)
-  throws IOException { 
-    writeEventType(gen, event.getEventType());
-    event.writeFields(gen);
-    gen.writeRaw("\n");
+  synchronized void write(HistoryEvent event) throws IOException { 
+    Events.Event wrapper = new Events.Event();
+    wrapper.type = event.getEventType();
+    wrapper.event = event.getDatum();
+    writer.write(wrapper, encoder);
   }
   
   void flush() throws IOException { 
-    gen.flush();
+    encoder.flush();
   }
 
   void close() throws IOException {
-    gen.close();
-  }
-  
-  /**
-   * Write the event type to the JsonGenerator
-   * @param gen
-   * @param type
-   * @throws IOException
-   */
-  private void writeEventType(JsonGenerator gen, EventType type) 
-  throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField("EVENT_TYPE", type.toString());
-    gen.writeEndObject();
-  }  
-  
-  static void writeCounters(Counters counters, JsonGenerator gen)
-  throws IOException {
-    writeCounters("COUNTERS", counters, gen);
+    encoder.flush();
+    out.close();
   }
-  
-  static void writeCounters(String name, Counters counters, JsonGenerator gen)
-  throws IOException {
-    gen.writeFieldName(name);
-    gen.writeStartArray(); // Start of all groups
-    Iterator<CounterGroup> groupItr = counters.iterator();
-    while (groupItr.hasNext()) {
-      writeOneGroup(gen, groupItr.next());
-    }
-    gen.writeEndArray(); // End of all groups
+
+  private static final Schema GROUPS =
+    Schema.createArray(Events.CounterGroup._SCHEMA);
+
+  private static final Schema COUNTERS =
+    Schema.createArray(Events.Counter._SCHEMA);
+
+  static Events.Counters toAvro(Counters counters) {
+    return toAvro(counters, "COUNTERS");
   }
-  
-  static void writeOneGroup (JsonGenerator gen, CounterGroup grp)
-  throws IOException {
-    gen.writeStartObject(); // Start of this group
-    gen.writeStringField(GroupFields.ID.toString(), grp.getName());
-    gen.writeStringField(GroupFields.NAME.toString(), grp.getDisplayName());
-  
-    // Write out the List of counters
-    gen.writeFieldName(GroupFields.LIST.toString());
-    gen.writeStartArray(); // Start array of counters
-    Iterator<Counter> ctrItr = grp.iterator();
-    while (ctrItr.hasNext()) {
-      writeOneCounter(gen, ctrItr.next());
+  static Events.Counters toAvro(Counters counters, String name) {
+    Events.Counters result = new Events.Counters();
+    result.name = new Utf8(name);
+    result.groups = new GenericData.Array<Events.CounterGroup>(0, GROUPS);
+    if (counters == null) return result;
+    for (CounterGroup group : counters) {
+      Events.CounterGroup g = new Events.CounterGroup();
+      g.name = new Utf8(group.getName());
+      g.displayName = new Utf8(group.getDisplayName());
+      g.counts = new GenericData.Array<Events.Counter>(group.size(), COUNTERS);
+      for (Counter counter : group) {
+        Events.Counter c = new Events.Counter();
+        c.name = new Utf8(counter.getName());
+        c.displayName = new Utf8(counter.getDisplayName());
+        c.value = counter.getValue();
+        g.counts.add(c);
+      }
+      result.groups.add(g);
     }
-    gen.writeEndArray(); // End of all counters
-
-    gen.writeEndObject(); // End of this group
+    return result;
   }
 
-  static void writeOneCounter(JsonGenerator gen, Counter ctr)
-  throws IOException{
-    gen.writeStartObject();
-    gen.writeStringField(CounterFields.ID.toString(), ctr.getName());
-    gen.writeStringField(CounterFields.NAME.toString(), ctr.getDisplayName());
-    gen.writeNumberField("VALUE", ctr.getValue());
-    gen.writeEndObject();
-  }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=816801&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Fri Sep 18 22:22:37 2009
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"namespace": "org.apache.hadoop.mapreduce.jobhistory",
+ "protocol": "Events",
+
+ "types": [
+
+     {"type": "record", "name": "Counter",
+      "fields": [
+          {"name": "name", "type": "string"},
+          {"name": "displayName", "type": "string"},
+          {"name": "value", "type": "long"}
+      ]
+     },
+
+     {"type": "record", "name": "CounterGroup",
+      "fields": [
+          {"name": "name", "type": "string"},
+          {"name": "displayName", "type": "string"},
+          {"name": "counts", "type": {"type": "array", "items": "Counter"}}
+      ]
+     },
+
+     {"type": "record", "name": "Counters",
+      "fields": [
+          {"name": "name", "type": "string"},
+          {"name": "groups", "type": {"type": "array", "items": "CounterGroup"}}
+      ]
+     },
+
+     {"type": "record", "name": "JobFinished",
+      "fields": [
+          {"name": "jobid", "type": "string"},
+          {"name": "finishTime", "type": "long"},
+          {"name": "finishedMaps", "type": "int"},
+          {"name": "finishedReduces", "type": "int"},
+          {"name": "failedMaps", "type": "int"},
+          {"name": "failedReduces", "type": "int"},
+          {"name": "totalCounters", "type": "Counters"},
+          {"name": "mapCounters", "type": "Counters"},
+          {"name": "reduceCounters", "type": "Counters"}
+      ]
+     },
+
+     {"type": "record", "name": "JobInited",
+      "fields": [
+          {"name": "jobid", "type": "string"},
+          {"name": "launchTime", "type": "long"},
+          {"name": "totalMaps", "type": "int"},
+          {"name": "totalReduces", "type": "int"},
+          {"name": "jobStatus", "type": "string"}
+      ]
+     },
+
+     {"type": "record", "name": "JobSubmitted",
+      "fields": [
+          {"name": "jobid", "type": "string"},
+          {"name": "jobName", "type": "string"},
+          {"name": "userName", "type": "string"},
+          {"name": "submitTime", "type": "long"},
+          {"name": "jobConfPath", "type": "string"}
+      ]
+     },
+
+     {"type": "record", "name": "JobInfoChange",
+      "fields": [
+          {"name": "jobid", "type": "string"},
+          {"name": "submitTime", "type": "long"},
+          {"name": "launchTime", "type": "long"}
+      ]
+     },
+
+     {"type": "record", "name": "JobPriorityChange",
+      "fields": [
+          {"name": "jobid", "type": "string"},
+          {"name": "priority", "type": "string"}
+      ]
+     },
+
+     {"type": "record", "name": "JobStatusChanged",
+      "fields": [
+          {"name": "jobid", "type": "string"},
+          {"name": "jobStatus", "type": "string"}
+      ]
+     },
+
+     {"type": "record", "name": "JobUnsuccessfulCompletion",
+      "fields": [
+          {"name": "jobid", "type": "string"},
+          {"name": "finishTime", "type": "long"},
+          {"name": "finishedMaps", "type": "int"},
+          {"name": "finishedReduces", "type": "int"},
+          {"name": "jobStatus", "type": "string"}
+      ]
+     },
+
+     {"type": "record", "name": "MapAttemptFinished",
+      "fields": [
+          {"name": "taskid", "type": "string"},
+          {"name": "attemptId", "type": "string"},
+          {"name": "taskType", "type": "string"},
+          {"name": "taskStatus", "type": "string"},
+          {"name": "mapFinishTime", "type": "long"},
+          {"name": "finishTime", "type": "long"},
+          {"name": "hostname", "type": "string"},
+          {"name": "state", "type": "string"},
+          {"name": "counters", "type": "Counters"}
+      ]
+     },
+
+     {"type": "record", "name": "ReduceAttemptFinished",
+      "fields": [
+          {"name": "taskid", "type": "string"},
+          {"name": "attemptId", "type": "string"},
+          {"name": "taskType", "type": "string"},
+          {"name": "taskStatus", "type": "string"},
+          {"name": "shuffleFinishTime", "type": "long"},
+          {"name": "sortFinishTime", "type": "long"},
+          {"name": "finishTime", "type": "long"},
+          {"name": "hostname", "type": "string"},
+          {"name": "state", "type": "string"},
+          {"name": "counters", "type": "Counters"}
+      ]
+     },
+
+     {"type": "record", "name": "TaskAttemptFinished",
+      "fields": [
+          {"name": "taskid", "type": "string"},
+          {"name": "attemptId", "type": "string"},
+          {"name": "taskType", "type": "string"},
+          {"name": "taskStatus", "type": "string"},
+          {"name": "finishTime", "type": "long"},
+          {"name": "hostname", "type": "string"},
+          {"name": "state", "type": "string"},
+          {"name": "counters", "type": "Counters"}
+      ]
+     },
+
+     {"type": "record", "name": "TaskAttemptStarted",
+      "fields": [
+          {"name": "taskid", "type": "string"},
+          {"name": "taskType", "type": "string"},
+          {"name": "attemptId", "type": "string"},
+          {"name": "startTime", "type": "long"},
+          {"name": "trackerName", "type": "string"},
+          {"name": "httpPort", "type": "int"}
+      ]
+     },
+
+     {"type": "record", "name": "TaskAttemptUnsuccessfulCompletion",
+      "fields": [
+          {"name": "taskid", "type": "string"},
+          {"name": "taskType", "type": "string"},
+          {"name": "attemptId", "type": "string"},
+          {"name": "finishTime", "type": "long"},
+          {"name": "hostname", "type": "string"},
+          {"name": "status", "type": "string"},
+          {"name": "error", "type": "string"}
+      ]
+     },
+
+     {"type": "record", "name": "TaskFailed",
+      "fields": [
+          {"name": "taskid", "type": "string"},
+          {"name": "taskType", "type": "string"},
+          {"name": "finishTime", "type": "long"},
+          {"name": "error", "type": "string"},
+          {"name": "failedDueToAttempt", "type": ["null", "string"] },
+          {"name": "status", "type": "string"}
+      ]
+     },
+
+     {"type": "record", "name": "TaskFinished",
+      "fields": [
+          {"name": "taskid", "type": "string"},
+          {"name": "taskType", "type": "string"},
+          {"name": "finishTime", "type": "long"},
+          {"name": "status", "type": "string"},
+          {"name": "counters", "type": "Counters"}
+      ]
+     },
+
+     {"type": "record", "name": "TaskStarted",
+      "fields": [
+          {"name": "taskid", "type": "string"},
+          {"name": "taskType", "type": "string"},
+          {"name": "startTime", "type": "long"},
+          {"name": "splitLocations", "type": "string"}
+      ]
+     },
+
+     {"type": "record", "name": "TaskUpdated",
+      "fields": [
+          {"name": "taskid", "type": "string"},
+          {"name": "finishTime", "type": "long"}
+      ]
+     },
+
+     {"type": "enum", "name": "EventType",
+      "symbols": [
+          "JOB_SUBMITTED",
+          "JOB_INITED",
+          "JOB_FINISHED",
+          "JOB_PRIORITY_CHANGED",
+          "JOB_STATUS_CHANGED",
+          "JOB_FAILED",
+          "JOB_KILLED",
+          "JOB_INFO_CHANGED",
+          "TASK_STARTED",
+          "TASK_FINISHED",
+          "TASK_FAILED",
+          "TASK_UPDATED",
+          "MAP_ATTEMPT_STARTED",
+          "MAP_ATTEMPT_FINISHED",
+          "MAP_ATTEMPT_FAILED",
+          "MAP_ATTEMPT_KILLED",
+          "REDUCE_ATTEMPT_STARTED",
+          "REDUCE_ATTEMPT_FINISHED",
+          "REDUCE_ATTEMPT_FAILED",
+          "REDUCE_ATTEMPT_KILLED",
+          "SETUP_ATTEMPT_STARTED",
+          "SETUP_ATTEMPT_FINISHED",
+          "SETUP_ATTEMPT_FAILED",
+          "SETUP_ATTEMPT_KILLED",
+          "CLEANUP_ATTEMPT_STARTED",
+          "CLEANUP_ATTEMPT_FINISHED",
+          "CLEANUP_ATTEMPT_FAILED",
+          "CLEANUP_ATTEMPT_KILLED"
+          ]
+     },
+
+     {"type": "record", "name": "Event",
+      "fields": [
+          {"name": "type", "type": "EventType"},
+          {"name": "event",
+           "type": [
+               "JobFinished",
+               "JobInfoChange",
+               "JobInited",
+               "JobPriorityChange",
+               "JobStatusChanged",
+               "JobSubmitted",
+               "JobUnsuccessfulCompletion",
+               "MapAttemptFinished",
+               "ReduceAttemptFinished",
+               "TaskAttemptFinished",
+               "TaskAttemptStarted",
+               "TaskAttemptUnsuccessfulCompletion",
+               "TaskFailed",
+               "TaskFinished",
+               "TaskStarted",
+               "TaskUpdated"
+               ]
+          }
+      ]
+     }
+
+ ],
+
+ "messages": {}
+}

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java Fri Sep 18 22:22:37 2009
@@ -20,37 +20,18 @@
 
 import java.io.IOException;
 
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-
 /**
- * The interface all job history events implement
- *
+ * Interface for event wrapper classes.  Implementations each wrap an
+ * Avro-generated class, adding constructors and accessor methods.
  */
 public interface HistoryEvent {
 
-  // The category that history event belongs to
-  enum EventCategory {
-    JOB, TASK, TASK_ATTEMPT
-  }
-  
-  /**
-   * Serialize the Fields of the event to the JsonGenerator
-   * @param gen JsonGenerator to write to
-   * @throws IOException
-   */
-  void writeFields (JsonGenerator gen) throws IOException;
-  
-  /**
-   * Deserialize the fields of the event from the JsonParser
-   * @param parser JsonParser to read from
-   * @throws IOException
-   */
-  void readFields(JsonParser parser) throws IOException;
-  
-  /** Return the event type */
-  EventType getEventType();
-  
-  /** Retun the event category */
-  EventCategory getEventCategory();
+  /** Return this event's type. */
+  Events.EventType getEventType();
+
+  /** Return the Avro datum wrapped by this. */
+  Object getDatum();
+
+  /** Set the Avro datum wrapped by this. */
+  void setDatum(Object datum);
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobFinishedEvent.java Fri Sep 18 22:22:37 2009
@@ -22,39 +22,15 @@
 
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
+
 /**
  * Event to record successful completion of job
  *
  */
 public class JobFinishedEvent  implements HistoryEvent {
-
-  private EventCategory category;
-  private JobID jobid;
-  private long finishTime;
-  private int finishedMaps;
-  private int finishedReduces;
-  private int failedMaps;
-  private int failedReduces;
-  private Counters totalCounters;
-  private Counters mapCounters;
-  private Counters reduceCounters;
-
-  enum EventFields { EVENT_CATEGORY,
-    JOB_ID,
-    FINISH_TIME,
-    FINISHED_MAPS,
-    FINISHED_REDUCES,
-    FAILED_MAPS,
-    FAILED_REDUCES,
-    MAP_COUNTERS,
-    REDUCE_COUNTERS,
-    TOTAL_COUNTERS }
-
-  JobFinishedEvent() {
-  }
+  private Events.JobFinished datum = new Events.JobFinished();
 
   /** 
    * Create an event to record successful job completion
@@ -73,106 +49,50 @@
       int failedMaps, int failedReduces,
       Counters mapCounters, Counters reduceCounters,
       Counters totalCounters) {
-    this.jobid = id;
-    this.finishTime = finishTime;
-    this.finishedMaps = finishedMaps;
-    this.finishedReduces = finishedReduces;
-    this.failedMaps = failedMaps;
-    this.failedReduces = failedReduces;
-    this.mapCounters = mapCounters;
-    this.reduceCounters = reduceCounters;
-    this.totalCounters = totalCounters;
-    this.category = EventCategory.JOB;
+    datum.jobid = new Utf8(id.toString());
+    datum.finishTime = finishTime;
+    datum.finishedMaps = finishedMaps;
+    datum.finishedReduces = finishedReduces;
+    datum.failedMaps = failedMaps;
+    datum.failedReduces = failedReduces;
+    datum.mapCounters =
+      EventWriter.toAvro(mapCounters, "MAP_COUNTERS");
+    datum.reduceCounters =
+      EventWriter.toAvro(reduceCounters, "REDUCE_COUNTERS");
+    datum.totalCounters =
+      EventWriter.toAvro(totalCounters, "TOTAL_COUNTERS");
+  }
+
+  JobFinishedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) { this.datum = (Events.JobFinished)datum; }
+  public Events.EventType getEventType() {
+    return Events.EventType.JOB_FINISHED;
   }
 
-  /** Get the Event Category */
-  public EventCategory getEventCategory() { return category; }
   /** Get the Job ID */
-  public JobID getJobid() { return jobid; }
+  public JobID getJobid() { return JobID.forName(datum.jobid.toString()); }
   /** Get the job finish time */
-  public long getFinishTime() { return finishTime; }
+  public long getFinishTime() { return datum.finishTime; }
   /** Get the number of finished maps for the job */
-  public int getFinishedMaps() { return finishedMaps; }
+  public int getFinishedMaps() { return datum.finishedMaps; }
   /** Get the number of finished reducers for the job */
-  public int getFinishedReduces() { return finishedReduces; }
+  public int getFinishedReduces() { return datum.finishedReduces; }
   /** Get the number of failed maps for the job */
-  public int getFailedMaps() { return failedMaps; }
+  public int getFailedMaps() { return datum.failedMaps; }
   /** Get the number of failed reducers for the job */
-  public int getFailedReduces() { return failedReduces; }
+  public int getFailedReduces() { return datum.failedReduces; }
   /** Get the counters for the job */
-  public Counters getTotalCounters() { return totalCounters; }
-  /** Get the Map counters for the job */
-  public Counters getMapCounters() { return mapCounters; }
-  /** Get the reduce counters for the job */
-  public Counters getReduceCounters() { return reduceCounters; }
-  /** Get the event type */
-  public EventType getEventType() { 
-    return EventType.JOB_FINISHED;
+  public Counters getTotalCounters() {
+    return EventReader.fromAvro(datum.totalCounters);
   }
-
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected token while reading");
-    }
-
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldname = jp.getCurrentName();
-      jp.nextToken(); // move to value
-      switch (Enum.valueOf(EventFields.class, fieldname)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case JOB_ID:
-        jobid = JobID.forName(jp.getText());
-        break;
-      case FINISH_TIME:
-        finishTime = jp.getLongValue();
-        break;
-      case FINISHED_MAPS:
-        finishedMaps = jp.getIntValue();
-        break;
-      case FINISHED_REDUCES:
-        finishedReduces = jp.getIntValue();
-        break;
-      case FAILED_MAPS:
-        failedMaps = jp.getIntValue();
-        break;
-      case FAILED_REDUCES:
-        failedReduces = jp.getIntValue();
-        break;
-      case MAP_COUNTERS:
-        mapCounters = EventReader.readCounters(jp);
-        break;
-      case REDUCE_COUNTERS:
-        reduceCounters = EventReader.readCounters(jp);
-        break;
-      case TOTAL_COUNTERS:
-        totalCounters = EventReader.readCounters(jp);
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldname+"'!");
-      }
-    }
+  /** Get the Map counters for the job */
+  public Counters getMapCounters() {
+    return EventReader.fromAvro(datum.mapCounters);
   }
-
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString());
-    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
-    gen.writeNumberField(EventFields.FINISHED_MAPS.toString(), finishedMaps);
-    gen.writeNumberField(EventFields.FINISHED_REDUCES.toString(), 
-        finishedReduces);
-    gen.writeNumberField(EventFields.FAILED_MAPS.toString(), failedMaps);
-    gen.writeNumberField(EventFields.FAILED_REDUCES.toString(),
-        failedReduces);
-    EventWriter.writeCounters(EventFields.MAP_COUNTERS.toString(),
-        mapCounters, gen);
-    EventWriter.writeCounters(EventFields.REDUCE_COUNTERS.toString(),
-        reduceCounters, gen);
-    EventWriter.writeCounters(EventFields.TOTAL_COUNTERS.toString(),
-        totalCounters, gen);
-    gen.writeEndObject();
+  /** Get the reduce counters for the job */
+  public Counters getReduceCounters() {
+    return EventReader.fromAvro(datum.reduceCounters);
   }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Fri Sep 18 22:22:37 2009
@@ -65,7 +65,7 @@
    */
   public JobHistoryParser(FileSystem fs, Path historyFile) 
   throws IOException {
-    in = fs.open(historyFile);
+    this(fs.open(historyFile));
   }
   
   /**
@@ -105,7 +105,7 @@
   }
   
   private void handleEvent(HistoryEvent event) throws IOException { 
-    EventType type = event.getEventType();
+    Events.EventType type = event.getEventType();
 
     switch (type) {
     case JOB_SUBMITTED:

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInfoChangeEvent.java Fri Sep 18 22:22:37 2009
@@ -21,25 +21,15 @@
 import java.io.IOException;
 
 import org.apache.hadoop.mapreduce.JobID;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record changes in the submit and launch time of
  * a job
  */
 public class JobInfoChangeEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private JobID jobid;
-  private  long submitTime;
-  private  long launchTime;
-
-  enum EventFields { EVENT_CATEGORY,
-                     JOB_ID,
-                     SUBMIT_TIME,
-                     LAUNCH_TIME }
+  private Events.JobInfoChange datum = new Events.JobInfoChange();
 
   /** 
    * Create a event to record the submit and launch time of a job
@@ -48,61 +38,27 @@
    * @param launchTime Launch time of the job
    */
   public JobInfoChangeEvent(JobID id, long submitTime, long launchTime) {
-    this.jobid = id;
-    this.submitTime = submitTime;
-    this.launchTime = launchTime;
-    this.category = EventCategory.JOB;
+    datum.jobid = new Utf8(id.toString());
+    datum.submitTime = submitTime;
+    datum.launchTime = launchTime;
   }
 
   JobInfoChangeEvent() { }
 
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.JobInfoChange)datum;
+  }
+
   /** Get the Job ID */
-  public JobID getJobId() { return jobid; }
+  public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the Job submit time */
-  public long getSubmitTime() { return submitTime; }
+  public long getSubmitTime() { return datum.submitTime; }
   /** Get the Job launch time */
-  public long getLaunchTime() { return launchTime; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
-  /** Get the event type */
-  public EventType getEventType() {
-    return EventType.JOB_INFO_CHANGED;
-  }
+  public long getLaunchTime() { return datum.launchTime; }
 
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-        case EVENT_CATEGORY: 
-          category = Enum.valueOf(EventCategory.class, jp.getText());
-          break;
-        case JOB_ID:
-          jobid = JobID.forName(jp.getText());
-          break;
-        case SUBMIT_TIME:
-          submitTime = jp.getLongValue();
-          break;
-        case LAUNCH_TIME:
-          launchTime = jp.getLongValue();
-          break;
-        default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
+  public Events.EventType getEventType() {
+    return Events.EventType.JOB_INFO_CHANGED;
   }
 
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-                         category.toString());
-    gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString());
-    gen.writeNumberField(EventFields.SUBMIT_TIME.toString(), submitTime);
-    gen.writeNumberField(EventFields.LAUNCH_TIME.toString(), launchTime);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java Fri Sep 18 22:22:37 2009
@@ -21,100 +21,51 @@
 import java.io.IOException;
 
 import org.apache.hadoop.mapreduce.JobID;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record the initialization of a job
  *
  */
 public class JobInitedEvent implements HistoryEvent {
+  private Events.JobInited datum = new Events.JobInited();
 
-  private EventCategory category;
-  private JobID jobid;
-  private  long launchTime;
-  private  int totalMaps;
-  private  int totalReduces;
-  private  String jobStatus;
-
-  enum EventFields { EVENT_CATEGORY,
-                     JOB_ID,
-                     LAUNCH_TIME,
-                     TOTAL_MAPS,
-                     TOTAL_REDUCES,
-                     JOB_STATUS }
-/**
- * Create an event to record job initialization
- * @param id
- * @param launchTime
- * @param totalMaps
- * @param totalReduces
- * @param jobStatus
- */
-public JobInitedEvent(JobID id, long launchTime, int totalMaps,
-      int totalReduces, String jobStatus) {
-    this.jobid = id;
-    this.launchTime = launchTime;
-    this.totalMaps = totalMaps;
-    this.totalReduces = totalReduces;
-    this.jobStatus = jobStatus;
-    this.category = EventCategory.JOB;
+  /**
+   * Create an event to record job initialization
+   * @param id
+   * @param launchTime
+   * @param totalMaps
+   * @param totalReduces
+   * @param jobStatus
+   */
+  public JobInitedEvent(JobID id, long launchTime, int totalMaps,
+                        int totalReduces, String jobStatus) {
+    datum.jobid = new Utf8(id.toString());
+    datum.launchTime = launchTime;
+    datum.totalMaps = totalMaps;
+    datum.totalReduces = totalReduces;
+    datum.jobStatus = new Utf8(jobStatus);
   }
 
   JobInitedEvent() { }
 
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) { this.datum = (Events.JobInited)datum; }
+
   /** Get the job ID */
-  public JobID getJobId() { return jobid; }
+  public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the launch time */
-  public long getLaunchTime() { return launchTime; }
+  public long getLaunchTime() { return datum.launchTime; }
   /** Get the total number of maps */
-  public int getTotalMaps() { return totalMaps; }
+  public int getTotalMaps() { return datum.totalMaps; }
   /** Get the total number of reduces */
-  public int getTotalReduces() { return totalReduces; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public int getTotalReduces() { return datum.totalReduces; }
   /** Get the status */
-  public String getStatus() { return jobStatus; }
+  public String getStatus() { return datum.jobStatus.toString(); }
  /** Get the event type */
-  public EventType getEventType() {
-    return EventType.JOB_INITED;
+  public Events.EventType getEventType() {
+    return Events.EventType.JOB_INITED;
   }
 
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading, +" +
-      		" expected a Start Object");
-    }
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-        case EVENT_CATEGORY: 
-          category = Enum.valueOf(EventCategory.class, jp.getText());
-          break;
-        case JOB_ID: jobid = JobID.forName(jp.getText()); break;
-        case LAUNCH_TIME: launchTime = jp.getLongValue(); break;
-        case TOTAL_MAPS: totalMaps = jp.getIntValue(); break;
-        case TOTAL_REDUCES: totalReduces =  jp.getIntValue(); break;
-        case JOB_STATUS: jobStatus = jp.getText(); break;
-        default: 
-        throw new IOException("Unrecognized field '"+ fieldName + "'!");
-      }
-    }
-  }
-
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString());
-    gen.writeNumberField(EventFields.LAUNCH_TIME.toString(), launchTime);
-    gen.writeNumberField(EventFields.TOTAL_MAPS.toString(), totalMaps);
-    gen.writeNumberField(EventFields.TOTAL_REDUCES.toString(), totalReduces);
-    gen.writeStringField(EventFields.JOB_STATUS.toString(),
-        jobStatus);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobPriorityChangeEvent.java Fri Sep 18 22:22:37 2009
@@ -22,79 +22,41 @@
 
 import org.apache.hadoop.mapred.JobPriority;
 import org.apache.hadoop.mapreduce.JobID;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record the change of priority of a job
  *
  */
 public class JobPriorityChangeEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private JobID jobid;
-  private JobPriority priority;
-
-
-  enum EventFields { EVENT_CATEGORY,
-    JOB_ID,
-    PRIORITY }
+  private Events.JobPriorityChange datum = new Events.JobPriorityChange();
 
   /** Generate an event to record changes in Job priority
    * @param id Job Id
    * @param priority The new priority of the job
    */
   public JobPriorityChangeEvent(JobID id, JobPriority priority) {
-    this.jobid = id;
-    this.priority = priority;
-    this.category = EventCategory.JOB;
+    datum.jobid = new Utf8(id.toString());
+    datum.priority = new Utf8(priority.name());
   }
 
   JobPriorityChangeEvent() { }
 
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.JobPriorityChange)datum;
+  }
+
   /** Get the Job ID */
-  public JobID getJobId() { return jobid; }
+  public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the job priority */
-  public JobPriority getPriority() { return priority; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
-  /** Get the event type */
-  public EventType getEventType() {
-    return EventType.JOB_PRIORITY_CHANGED;
+  public JobPriority getPriority() {
+    return JobPriority.valueOf(datum.priority.toString());
   }
-
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-      case EVENT_CATEGORY: 
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case JOB_ID:
-        jobid = JobID.forName(jp.getText());
-        break;
-      case PRIORITY: 
-        priority = Enum.valueOf(JobPriority.class, jp.getText());
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
+  /** Get the event type */
+  public Events.EventType getEventType() {
+    return Events.EventType.JOB_PRIORITY_CHANGED;
   }
 
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString());
-    gen.writeStringField(EventFields.PRIORITY.toString(), 
-        priority.toString());
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobStatusChangedEvent.java Fri Sep 18 22:22:37 2009
@@ -21,23 +21,15 @@
 import java.io.IOException;
 
 import org.apache.hadoop.mapreduce.JobID;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record the change of status for a job
  *
  */
 public class JobStatusChangedEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private JobID jobid;
-  private  String jobStatus;
-
-  enum EventFields { EVENT_CATEGORY,
-    JOB_ID,
-    JOB_STATUS }
+  private Events.JobStatusChanged datum = new Events.JobStatusChanged();
 
   /**
    * Create an event to record the change in the Job Status
@@ -45,55 +37,24 @@
    * @param jobStatus The new job status
    */
   public JobStatusChangedEvent(JobID id, String jobStatus) {
-    this.jobid = id;
-    this.jobStatus = jobStatus;
-    this.category = EventCategory.JOB;
+    datum.jobid = new Utf8(id.toString());
+    datum.jobStatus = new Utf8(jobStatus);
   }
 
-  JobStatusChangedEvent() {
+  JobStatusChangedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.JobStatusChanged)datum;
   }
 
   /** Get the Job Id */
-  public JobID getJobId() { return jobid; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the event status */
-  public String getStatus() { return jobStatus; }
+  public String getStatus() { return datum.jobStatus.toString(); }
   /** Get the event type */
-  public EventType getEventType() {
-    return EventType.JOB_STATUS_CHANGED;
+  public Events.EventType getEventType() {
+    return Events.EventType.JOB_STATUS_CHANGED;
   }
 
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case JOB_ID:
-        jobid = JobID.forName(jp.getText());
-        break;
-      case JOB_STATUS:
-        jobStatus = jp.getText();
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
-  }
-
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString());
-    gen.writeStringField(EventFields.JOB_STATUS.toString(), jobStatus);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobSubmittedEvent.java Fri Sep 18 22:22:37 2009
@@ -21,29 +21,15 @@
 import java.io.IOException;
 
 import org.apache.hadoop.mapreduce.JobID;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record the submission of a job
  *
  */
 public class JobSubmittedEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private JobID jobid;
-  private  String jobName;
-  private  String userName;
-  private  long submitTime;
-  private  String jobConfPath;
-
-  enum EventFields { EVENT_CATEGORY,
-                     JOB_ID,
-                     JOB_NAME,
-                     USER_NAME,
-                     SUBMIT_TIME,
-                     JOB_CONF_PATH }
+  private Events.JobSubmitted datum = new Events.JobSubmitted();
 
   /**
    * Create an event to record job submission
@@ -55,64 +41,31 @@
    */
   public JobSubmittedEvent(JobID id, String jobName, String userName,
       long submitTime, String jobConfPath) {
-    this.jobid = id;
-    this.jobName = jobName;
-    this.userName = userName;
-    this.submitTime = submitTime;
-    this.jobConfPath = jobConfPath;
-    this.category = EventCategory.JOB;
+    datum.jobid = new Utf8(id.toString());
+    datum.jobName = new Utf8(jobName);
+    datum.userName = new Utf8(userName);
+    datum.submitTime = submitTime;
+    datum.jobConfPath = new Utf8(jobConfPath);
   }
 
-  JobSubmittedEvent() {
+  JobSubmittedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.JobSubmitted)datum;
   }
 
   /** Get the Job Id */
-  public JobID getJobId() { return jobid; }
+  public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the Job name */
-  public String getJobName() { return jobName; }
+  public String getJobName() { return datum.jobName.toString(); }
   /** Get the user name */
-  public String getUserName() { return userName; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public String getUserName() { return datum.userName.toString(); }
   /** Get the submit time */
-  public long getSubmitTime() { return submitTime; }
+  public long getSubmitTime() { return datum.submitTime; }
   /** Get the Path for the Job Configuration file */
-  public String getJobConfPath() { return jobConfPath; }
+  public String getJobConfPath() { return datum.jobConfPath.toString(); }
   /** Get the event type */
-  public EventType getEventType() { return EventType.JOB_SUBMITTED; }
+  public Events.EventType getEventType() { return Events.EventType.JOB_SUBMITTED; }
 
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case JOB_ID: jobid = JobID.forName(jp.getText()); break;
-      case JOB_NAME: jobName = jp.getText(); break;
-      case USER_NAME: userName = jp.getText(); break;
-      case SUBMIT_TIME: submitTime = (long) jp.getLongValue(); break;
-      case JOB_CONF_PATH: jobConfPath = jp.getText(); break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
-  }
-
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString());
-    gen.writeStringField(EventFields.JOB_NAME.toString(), jobName);
-    gen.writeStringField(EventFields.USER_NAME.toString(), userName);
-    gen.writeNumberField(EventFields.SUBMIT_TIME.toString(), submitTime);
-    gen.writeStringField(EventFields.JOB_CONF_PATH.toString(), jobConfPath);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java Fri Sep 18 22:22:37 2009
@@ -21,29 +21,16 @@
 import java.io.IOException;
 
 import org.apache.hadoop.mapreduce.JobID;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record Failed and Killed completion of jobs
  *
  */
 public class JobUnsuccessfulCompletionEvent implements HistoryEvent {
-
-  private EventCategory category;
-  private JobID jobid;
-  private  long finishTime;
-  private  int finishedMaps;
-  private  int finishedReduces;
-  private  String jobStatus;
-
-  enum EventFields { EVENT_CATEGORY,
-    JOB_ID,
-    FINISH_TIME,
-    FINISHED_MAPS,
-    FINISHED_REDUCES,
-    JOB_STATUS }
+  private Events.JobUnsuccessfulCompletion datum
+    = new Events.JobUnsuccessfulCompletion();
 
   /**
    * Create an event to record unsuccessful completion (killed/failed) of jobs
@@ -56,80 +43,36 @@
   public JobUnsuccessfulCompletionEvent(JobID id, long finishTime,
       int finishedMaps,
       int finishedReduces, String status) {
-    this.jobid = id;
-    this.finishTime = finishTime;
-    this.finishedMaps = finishedMaps;
-    this.finishedReduces = finishedReduces;
-    this.jobStatus = status;
-    this.category = EventCategory.JOB;
+    datum.jobid = new Utf8(id.toString());
+    datum.finishTime = finishTime;
+    datum.finishedMaps = finishedMaps;
+    datum.finishedReduces = finishedReduces;
+    datum.jobStatus = new Utf8(status);
   }
 
-  JobUnsuccessfulCompletionEvent() {
+  JobUnsuccessfulCompletionEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.JobUnsuccessfulCompletion)datum;
   }
 
   /** Get the Job ID */
-  public JobID getJobId() { return jobid; }
+  public JobID getJobId() { return JobID.forName(datum.jobid.toString()); }
   /** Get the job finish time */
-  public long getFinishTime() { return finishTime; }
+  public long getFinishTime() { return datum.finishTime; }
   /** Get the number of finished maps */
-  public int getFinishedMaps() { return finishedMaps; }
+  public int getFinishedMaps() { return datum.finishedMaps; }
   /** Get the number of finished reduces */
-  public int getFinishedReduces() { return finishedReduces; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public int getFinishedReduces() { return datum.finishedReduces; }
   /** Get the status */
-  public String getStatus() { return jobStatus; }
+  public String getStatus() { return datum.jobStatus.toString(); }
   /** Get the event type */
-  public EventType getEventType() {
-    if ("FAILED".equals(jobStatus)) {
-      return EventType.JOB_FAILED;
+  public Events.EventType getEventType() {
+    if ("FAILED".equals(getStatus())) {
+      return Events.EventType.JOB_FAILED;
     } else
-      return EventType.JOB_KILLED;
+      return Events.EventType.JOB_KILLED;
   }
 
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected Token while reading");
-    }
-
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldName = jp.getCurrentName();
-      jp.nextToken(); // Move to the value
-      switch (Enum.valueOf(EventFields.class, fieldName)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case JOB_ID:
-        jobid = JobID.forName(jp.getText());
-        break;
-      case FINISH_TIME:
-        finishTime = jp.getLongValue();
-        break;
-      case FINISHED_MAPS:
-        finishedMaps = jp.getIntValue();
-        break;
-      case FINISHED_REDUCES:
-        finishedReduces =  jp.getIntValue();
-        break;
-      case JOB_STATUS:
-        jobStatus = jp.getText();
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldName+"'!");
-      }
-    }
-  }
-
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.JOB_ID.toString(), jobid.toString());
-    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
-    gen.writeNumberField(EventFields.FINISHED_MAPS.toString(), finishedMaps);
-    gen.writeNumberField(EventFields.FINISHED_REDUCES.toString(),
-        finishedReduces);
-    gen.writeStringField(EventFields.JOB_STATUS.toString(), jobStatus);
-    gen.writeEndObject();
-  }
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java?rev=816801&r1=816800&r2=816801&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java Fri Sep 18 22:22:37 2009
@@ -24,41 +24,16 @@
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.codehaus.jackson.JsonGenerator;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
+
+import org.apache.avro.util.Utf8;
 
 /**
  * Event to record successful completion of a map attempt
  *
  */
 public class MapAttemptFinishedEvent  implements HistoryEvent {
-
-  private EventCategory category;
-  private TaskID taskid;
-  private TaskAttemptID attemptId;
-  private TaskType taskType;
-  private String taskStatus;
-  private long mapFinishTime;
-  private long finishTime;
-  private String hostname;
-  private String state;
-  private Counters counters;
+  private Events.MapAttemptFinished datum = new Events.MapAttemptFinished();
   
-  enum EventFields { EVENT_CATEGORY,
-                     TASK_ID,
-                     TASK_ATTEMPT_ID,
-                     TASK_TYPE,
-                     TASK_STATUS,
-                     MAP_FINISH_TIME,
-                     FINISH_TIME,
-                     HOSTNAME,
-                     STATE,
-                     COUNTERS }
-    
-  MapAttemptFinishedEvent() {
-  }
-
   /** 
    * Create an event for successful completion of map attempts
    * @param id Task Attempt ID
@@ -74,105 +49,49 @@
       TaskType taskType, String taskStatus, 
       long mapFinishTime, long finishTime,
       String hostname, String state, Counters counters) {
-    this.taskid = id.getTaskID();
-    this.attemptId = id;
-    this.taskType = taskType;
-    this.taskStatus = taskStatus;
-    this.mapFinishTime = mapFinishTime;
-    this.finishTime = finishTime;
-    this.hostname = hostname;
-    this.state = state;
-    this.counters = counters;
-    this.category = EventCategory.TASK_ATTEMPT;
+    datum.taskid = new Utf8(id.getTaskID().toString());
+    datum.attemptId = new Utf8(id.toString());
+    datum.taskType = new Utf8(taskType.name());
+    datum.taskStatus = new Utf8(taskStatus);
+    datum.mapFinishTime = mapFinishTime;
+    datum.finishTime = finishTime;
+    datum.hostname = new Utf8(hostname);
+    datum.state = new Utf8(state);
+    datum.counters = EventWriter.toAvro(counters);
   }
   
+  MapAttemptFinishedEvent() {}
+
+  public Object getDatum() { return datum; }
+  public void setDatum(Object datum) {
+    this.datum = (Events.MapAttemptFinished)datum;
+  }
+
   /** Get the task ID */
-  public TaskID getTaskId() { return taskid; }
-  /** Get the event category */
-  public EventCategory getEventCategory() { return category; }
+  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
   /** Get the attempt id */
-  public TaskAttemptID getAttemptId() { return attemptId; }
+  public TaskAttemptID getAttemptId() {
+    return TaskAttemptID.forName(datum.attemptId.toString());
+  }
   /** Get the task type */
-  public TaskType getTaskType() { return taskType; }
+  public TaskType getTaskType() {
+    return TaskType.valueOf(datum.taskType.toString());
+  }
   /** Get the task status */
-  public String getTaskStatus() { return taskStatus; }
+  public String getTaskStatus() { return datum.taskStatus.toString(); }
   /** Get the map phase finish time */
-  public long getMapFinishTime() { return mapFinishTime; }
+  public long getMapFinishTime() { return datum.mapFinishTime; }
   /** Get the attempt finish time */
-  public long getFinishTime() { return finishTime; }
+  public long getFinishTime() { return datum.finishTime; }
   /** Get the host name */
-  public String getHostname() { return hostname; }
+  public String getHostname() { return datum.hostname.toString(); }
   /** Get the state string */
-  public String getState() { return state; }
+  public String getState() { return datum.state.toString(); }
   /** Get the counters */
-  public Counters getCounters() { return counters; }
+  Counters getCounters() { return EventReader.fromAvro(datum.counters); }
   /** Get the event type */
-   public EventType getEventType() {
-    return EventType.MAP_ATTEMPT_FINISHED;
+   public Events.EventType getEventType() {
+    return Events.EventType.MAP_ATTEMPT_FINISHED;
   }
   
-  public void readFields(JsonParser jp) throws IOException {
-    if (jp.nextToken() != JsonToken.START_OBJECT) {
-      throw new IOException("Unexpected token while reading");
-    }
-    
-    while (jp.nextToken() != JsonToken.END_OBJECT) {
-      String fieldname = jp.getCurrentName();
-      jp.nextToken(); // move to value
-      switch (Enum.valueOf(EventFields.class, fieldname)) {
-      case EVENT_CATEGORY:
-        category = Enum.valueOf(EventCategory.class, jp.getText());
-        break;
-      case TASK_ID:
-        taskid = TaskID.forName(jp.getText());
-        break;
-      case TASK_ATTEMPT_ID: 
-        attemptId = TaskAttemptID.forName(jp.getText());
-        break;
-      case TASK_TYPE:
-        taskType = TaskType.valueOf(jp.getText());
-        break;
-      case TASK_STATUS:
-        taskStatus = jp.getText();
-        break;
-      case MAP_FINISH_TIME:
-        mapFinishTime = jp.getLongValue();
-        break;
-      case FINISH_TIME:
-        finishTime = jp.getLongValue();
-        break;
-      case HOSTNAME:
-        hostname = jp.getText();
-        break;
-      case STATE:
-        state = jp.getText();
-        break;
-      case COUNTERS:
-        counters = EventReader.readCounters(jp);
-        break;
-      default: 
-        throw new IOException("Unrecognized field '"+fieldname+"'!");
-      }
-    }
-  }
-
-  public void writeFields(JsonGenerator gen) throws IOException {
-    gen.writeStartObject();
-    gen.writeStringField(EventFields.EVENT_CATEGORY.toString(),
-        category.toString());
-    gen.writeStringField(EventFields.TASK_ID.toString(), taskid.toString());
-    gen.writeStringField(EventFields.TASK_ATTEMPT_ID.toString(),
-        attemptId.toString());
-    gen.writeStringField(EventFields.TASK_TYPE.toString(), 
-        taskType.toString());
-    gen.writeStringField(EventFields.TASK_STATUS.toString(),
-        taskStatus);
-    gen.writeNumberField(EventFields.MAP_FINISH_TIME.toString(),
-        mapFinishTime);
-    gen.writeNumberField(EventFields.FINISH_TIME.toString(), finishTime);
-    gen.writeStringField(EventFields.HOSTNAME.toString(), hostname);
-    gen.writeStringField(EventFields.STATE.toString(), state);
-    EventWriter.writeCounters(counters, gen);
-    gen.writeEndObject();
-  }
 }



Mime
View raw message