avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r788462 [1/3] - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/generic/ src/java/org/apache/avro/io/ src/java/org/apache/avro/io/doc-files/ src/test/java/org/apache/avro/io/
Date Thu, 25 Jun 2009 18:53:17 GMT
Author: cutting
Date: Thu Jun 25 18:53:16 2009
New Revision: 788462

URL: http://svn.apache.org/viewvc?rev=788462&view=rev
Log:
AVRO-29.  Add to Java a validating encoder and decoder, and a resolving decoder.  Contributed by Thiruvalluvan M. G. & Raymie Stata.

Added:
    hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/doc-files/   (with props)
    hadoop/avro/trunk/src/java/org/apache/avro/io/doc-files/parsing.html
    hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestParsingTable.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestResolvingIO.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestResolvingTable.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValidatingIO.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=788462&r1=788461&r2=788462&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Jun 25 18:53:16 2009
@@ -41,6 +41,9 @@
 
     AVRO-48.  Add JSON parser for C.  (Matt Massie via cutting)
 
+    AVRO-29.  Add to Java a validating encoder & decoder, and a
+    resolving decoder.  (Thiruvalluvan M. G. & Raymie Stata)
+
   IMPROVEMENTS
 
     AVRO-11.  Re-implement specific and reflect datum readers and

Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=788462&r1=788461&r2=788462&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java Thu Jun 25 18:53:16 2009
@@ -18,59 +18,57 @@
 package org.apache.avro.generic;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
-import java.nio.ByteBuffer;
-
-import org.codehaus.jackson.JsonNode;
 
 import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.ResolvingDecoder;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.util.Utf8;
+import org.codehaus.jackson.JsonNode;
 
 /** {@link DatumReader} for generic Java objects. */
 public class GenericDatumReader<D> implements DatumReader<D> {
-  private Schema actual;
-  private Schema expected;
-
-  public GenericDatumReader() {}
+  private Schema writerSchema;
+  private Schema readerSchema;
+  
+  public GenericDatumReader() { }
 
-  public GenericDatumReader(Schema actual) {
-    setSchema(actual);
+  public GenericDatumReader(Schema schema) {
+    this.readerSchema = schema;
   }
 
-  public GenericDatumReader(Schema actual, Schema expected) {
-    this(actual);
-    this.expected = expected;
+  public GenericDatumReader(Schema writerSchema, Schema readerSchema) {
+    this.writerSchema = writerSchema;
+    this.readerSchema = readerSchema;
   }
 
-  public void setSchema(Schema actual) { this.actual = actual; }
+  public void setSchema(Schema schema) {
+    this.readerSchema = schema;
+  }
 
   @SuppressWarnings("unchecked")
   public D read(D reuse, Decoder in) throws IOException {
-    return (D) read(reuse, actual, expected != null ? expected : actual, in);
+    return (D) read(reuse, readerSchema,
+        writerSchema == null ? in :
+          new ResolvingDecoder(writerSchema, readerSchema, in));
   }
   
   /** Called to read data.*/
-  protected Object read(Object old, Schema actual,
-                        Schema expected, Decoder in) throws IOException {
-    if (actual.getType() == Type.UNION)           // resolve unions
-      actual = actual.getTypes().get((int)in.readIndex());
-    if (expected.getType() == Type.UNION)
-      expected = resolveExpected(actual, expected);
-    switch (actual.getType()) {
-    case RECORD:  return readRecord(old, actual, expected, in);
-    case ENUM:    return readEnum(actual, expected, in);
-    case ARRAY:   return readArray(old, actual, expected, in);
-    case MAP:     return readMap(old, actual, expected, in);
-    case FIXED:   return readFixed(old, actual, expected, in);
+  protected Object read(Object old, Schema schema, Decoder in)
+  	throws IOException {
+    switch (schema.getType()) {
+    case RECORD:  return readRecord(old, schema, in);
+    case ENUM:    return readEnum(schema, in);
+    case ARRAY:   return readArray(old, schema, in);
+    case MAP:     return readMap(old, schema, in);
+    case FIXED:   return readFixed(old, schema, in);
     case STRING:  return readString(old, in);
     case BYTES:   return readBytes(old, in);
     case INT:     return in.readInt();
@@ -79,90 +77,62 @@
     case DOUBLE:  return in.readDouble();
     case BOOLEAN: return in.readBoolean();
     case NULL:    return null;
-    default: throw new AvroRuntimeException("Unknown type: "+actual);
+    case UNION:   return readUnion(old, schema, in);
+    default: throw new AvroRuntimeException("Unknown type: " + schema +
+        " " + schema.getType());
     }
   }
 
-  private Schema resolveExpected(Schema actual, Schema expected) {
-    // first scan for exact match
-    for (Schema branch : expected.getTypes())
-      if (branch.getType() == actual.getType())
-        switch (branch.getType()) {
-        case RECORD:
-        case FIXED:
-          String name = branch.getName();
-          if (name == null || name.equals(actual.getName()))
-            return branch;
-          break;
-        default:
-          return branch;
-        }
-    // then scan match via numeric promotion
-    for (Schema branch : expected.getTypes())
-      switch (actual.getType()) {
-      case INT:
-        switch (expected.getType()) {
-        case LONG: case FLOAT: case DOUBLE:
-          return expected;
-        }
-        break;
-      case LONG:
-        switch (expected.getType()) {
-        case FLOAT: case DOUBLE:
-          return expected;
-        }
-        break;
-      case FLOAT:
-        switch (expected.getType()) {
-        case DOUBLE:
-          return expected;
-        }
-        break;
-      }
-    throw new AvroTypeException("Expected "+expected+", found "+actual);
-  }
-
   /** Called to read a record instance. May be overridden for alternate record
    * representations.*/
-  protected Object readRecord(Object old, Schema actual, Schema expected,
+  protected Object readRecord(Object old, Schema schema,
                               Decoder in) throws IOException {
-    /* TODO: We may want to compute the expected and actual mapping and cache
-     * the mapping (keyed by <actual, expected>). */
-    String recordName = expected.getName();
-    if (recordName != null && !recordName.equals(actual.getName()))
-      throw new AvroTypeException("Expected "+expected+", found "+actual);
-    Map<String, Field> expectedFields = expected.getFields();
-    // all fields not in expected should be removed by newRecord.
-    Object record = newRecord(old, expected);
+    if (in instanceof ResolvingDecoder) {
+      return readRecord(old, schema, (ResolvingDecoder) in);
+    }
+    Object record = newRecord(old, schema);
     int size = 0;
-    for (Map.Entry<String, Field> entry : actual.getFields().entrySet()) {
+    for (Map.Entry<String, Field> entry : schema.getFields().entrySet()) {
       String fieldName = entry.getKey();
-      Field actualField = entry.getValue();
-      Field expectedField =
-          expected == actual ? actualField : expectedFields.get(entry.getKey());
-      if (expectedField == null) {
-        skip(actualField.schema(), in);
-        continue;
-      }
-      int fieldPosition = expectedField.pos();
+      Field field = entry.getValue();
+      int fieldPosition = field.pos();
       Object oldDatum =
           (old != null) ? getField(record, fieldName, fieldPosition) : null;
       addField(record, fieldName, fieldPosition,
-               read(oldDatum,actualField.schema(),expectedField.schema(), in));
+               read(oldDatum, field.schema(), in));
       size++;
     }
-    if (expectedFields.size() > size) {           // not all fields set
-      Set<String> actualFields = actual.getFields().keySet();
-      for (Map.Entry<String, Field> entry : expectedFields.entrySet()) {
+    return record;
+  }
+  
+  protected Object readRecord(Object old, Schema schema,
+      ResolvingDecoder in) throws IOException {
+    Object record = newRecord(old, schema);
+    Map<String, Field> readerFields = schema.getFields();
+
+    BitSet bs = new BitSet();
+    for (int i = 0; i < readerFields.size(); i++) {
+      String fn = in.readFieldName();
+      if (fn == null) {
+        break;
+      }
+      Field f = readerFields.get(fn);
+      int fp = f.pos();
+      bs.set(fp);
+      Object oldDatum =
+        (old != null) ? getField(record, fn, fp) : null;
+      addField(record, fn, fp, read(oldDatum, f.schema(), in));
+    }
+    for (Map.Entry<String, Field> entry : readerFields.entrySet()) {
+      Field f = entry.getValue();
+      if (! bs.get(f.pos())) {
         String fieldName = entry.getKey();
-        if (!actualFields.contains(fieldName)) {  // an unset field
-          Field f = entry.getValue();
-          JsonNode json = f.defaultValue();
-          if (json != null)                       // has default
-            addField(record, fieldName, f.pos(),  // add default
-                     defaultFieldValue(old, f.schema(), json));
-          else if (old != null)                   // remove stale value
-            removeField(record, fieldName, entry.getValue().pos());
+        JsonNode json = f.defaultValue();
+        if (json != null) {                     // has default
+          addField(record, fieldName, f.pos(),  // add default
+                   defaultFieldValue(old, f.schema(), json));
+        } else if (old != null) {               // remove stale value
+          removeField(record, fieldName, entry.getValue().pos());
         }
       }
     }
@@ -237,18 +207,16 @@
     case DOUBLE:  return json.getDoubleValue();
     case BOOLEAN: return json.getBooleanValue();
     case NULL:    return null;
-    default: throw new AvroRuntimeException("Unknown type: "+actual);
+    default: throw new AvroRuntimeException("Unknown type: " + schema);
     }
   }
 
   /** Called to read an enum value. May be overridden for alternate enum
    * representations.  By default, returns the symbol as a String. */
-  protected Object readEnum(Schema actual, Schema expected, Decoder in)
+  protected Object readEnum(Schema schema, Decoder in)
     throws IOException {
-    String name = expected.getName();
-    if (name != null && !name.equals(actual.getName()))
-      throw new AvroTypeException("Expected "+expected+", found "+actual);
-    return createEnum(actual.getEnumSymbols().get(in.readEnum()), expected);
+    return createEnum(schema.getEnumSymbols().get(in.readEnum()),
+        schema);
   }
 
   /** Called to create an enum value. May be overridden for alternate enum
@@ -257,19 +225,18 @@
 
   /** Called to read an array instance.  May be overridden for alternate array
    * representations.*/
-  protected Object readArray(Object old, Schema actual, Schema expected,
+  protected Object readArray(Object old, Schema schema,
                              Decoder in) throws IOException {
-    Schema actualType = actual.getElementType();
-    Schema expectedType = expected.getElementType();
+    Schema type = schema.getElementType();
     long l = in.readArrayStart();
     if (l > 0) {
       Object array = newArray(old, (int) l);
       do {
         for (long i = 0; i < l; i++) {
-          addToArray(array, read(peekArray(array), actualType, expectedType, in));  
+          addToArray(array,
+              read(peekArray(array), type, in));  
         }
       } while ((l = in.arrayNext()) > 0);
-      
       return array;
     } else {
       return newArray(old, 0);
@@ -293,10 +260,9 @@
   
   /** Called to read a map instance.  May be overridden for alternate map
    * representations.*/
-  protected Object readMap(Object old, Schema actual, Schema expected,
+  protected Object readMap(Object old, Schema schema,
                            Decoder in) throws IOException {
-    Schema aValue = actual.getValueType();
-    Schema eValue = expected.getValueType();
+    Schema valueType = schema.getValueType();
     long l = in.readMapStart();
     Object map = newMap(old, (int) l);
     if (l > 0) {
@@ -304,7 +270,7 @@
         for (int i = 0; i < l; i++) {
           addToMap(map,
               readString(null, in),
-              read(null, aValue, eValue, in));
+              read(null, valueType, in));
         }
       } while ((l = in.mapNext()) > 0);
     }
@@ -320,13 +286,11 @@
   
   /** Called to read a fixed value. May be overridden for alternate fixed
    * representations.  By default, returns {@link GenericFixed}. */
-  protected Object readFixed(Object old, Schema actual, Schema expected,
+  protected Object readFixed(Object old, Schema schema,
                              Decoder in)
     throws IOException {
-    if (!actual.equals(expected))
-      throw new AvroTypeException("Expected "+expected+", found "+actual);
-    GenericFixed fixed = (GenericFixed)createFixed(old, expected);
-    in.readFixed(fixed.bytes(), 0, actual.getFixedSize());
+    GenericFixed fixed = (GenericFixed) createFixed(old, schema);
+    in.readFixed(fixed.bytes(), 0, schema.getFixedSize());
     return fixed;
   }
 
@@ -346,6 +310,14 @@
     System.arraycopy(bytes, 0, fixed.bytes(), 0, schema.getFixedSize());
     return fixed;
   }
+
+  private Object readUnion(Object old, Schema schema, Decoder in)
+    throws IOException {
+    int idx = in.readIndex();
+    Schema s = schema.getTypes().get(idx);
+    return read(old, s, in);
+  }
+
   /**
    * Called to create new record instances. Subclasses may override to use a
    * different record implementation. The returned instance must conform to the
@@ -408,54 +380,4 @@
    * override to use a different byte array representation.  By default, this
    * calls {@link ByteBuffer#wrap(byte[])}.*/
   protected Object createBytes(byte[] value) { return ByteBuffer.wrap(value); }
-
-  private static final Schema STRING_SCHEMA = Schema.create(Type.STRING);
-
-  /** Skip an instance of a schema. */
-  public static void skip(Schema schema, Decoder in) throws IOException {
-    switch (schema.getType()) {
-    case RECORD:
-      for (Map.Entry<String, Schema> entry : schema.getFieldSchemas())
-        skip(entry.getValue(), in);
-      break;
-    case ENUM:
-      in.readInt();
-      break;
-    case ARRAY:
-      Schema elementType = schema.getElementType();
-      for (long l = in.skipArray(); l > 0; l = in.skipArray()) {
-        for (long i = 0; i < l; i++) {
-          skip(elementType, in);
-        }
-      }
-      break;
-    case MAP:
-      Schema value = schema.getValueType();
-      for (long l = in.skipMap(); l > 0; l = in.skipMap()) {
-        for (long i = 0; i < l; i++) {
-          skip(STRING_SCHEMA, in);
-          skip(value, in);
-        }
-      }
-      break;
-    case UNION:
-      skip(schema.getTypes().get((int)in.readIndex()), in);
-      break;
-    case FIXED:
-      in.skipFixed(schema.getFixedSize());
-      break;
-    case STRING:
-    case BYTES:
-      in.skipBytes();
-      break;
-    case INT:     in.readInt();           break;
-    case LONG:    in.readLong();          break;
-    case FLOAT:   in.readFloat();         break;
-    case DOUBLE:  in.readDouble();        break;
-    case BOOLEAN: in.readBoolean();       break;
-    case NULL:                            break;
-    default: throw new RuntimeException("Unknown type: "+schema);
-    }
-  }
-
 }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java?rev=788462&r1=788461&r2=788462&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java Thu Jun 25 18:53:16 2009
@@ -189,11 +189,6 @@
   }
 
   @Override
-  public void readFixed(byte[] bytes) throws IOException {
-    readFixed(bytes, 0, bytes.length);
-  }
-  
-  @Override
   public void skipFixed(int length) throws IOException {
     doSkipBytes(length);
   }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java?rev=788462&r1=788461&r2=788462&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java Thu Jun 25 18:53:16 2009
@@ -21,7 +21,6 @@
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-import org.apache.avro.AvroTypeException;
 import org.apache.avro.ipc.ByteBufferOutputStream;
 import org.apache.avro.util.Utf8;
 
@@ -134,11 +133,6 @@
   }
 
   @Override
-  public void writeString(String str) throws IOException {
-    writeString(new Utf8(str));
-  }
-
-  @Override
   public void writeBytes(ByteBuffer bytes) throws IOException {
     byteWriter.write(bytes);
   }
@@ -150,21 +144,11 @@
   }
   
   @Override
-  public void writeBytes(byte[] bytes) throws IOException {
-    writeBytes(bytes, 0, bytes.length);
-  }
-
-  @Override
   public void writeFixed(byte[] bytes, int start, int len) throws IOException {
 	  out.write(bytes, start, len);
   }
 
   @Override
-  public void writeFixed(byte[] bytes) throws IOException {
-    writeFixed(bytes, 0, bytes.length);
-  }
-  
-  @Override
   public void writeEnum(int e) throws IOException {
     encodeLong(e, out);
   }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java?rev=788462&r1=788461&r2=788462&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java Thu Jun 25 18:53:16 2009
@@ -17,7 +17,6 @@
  */
 package org.apache.avro.io;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -143,7 +142,9 @@
    *          value to be read or the length is incorrect.
    * @throws IOException
    */
-  public abstract void readFixed(byte[] bytes) throws IOException;
+  public void readFixed(byte[] bytes) throws IOException {
+    readFixed(bytes, 0, bytes.length);
+  }
   
   /**
    * Discards fixed sized binary object.
@@ -276,4 +277,11 @@
    */
   public abstract int readIndex() throws IOException;
 
+  /**
+   * After reading a complete object that conforms to the schema or after an
+   * error, if you want to start reading another object, call this method.
+   */
+
+  public void reset() throws IOException {
+  }
 }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java?rev=788462&r1=788461&r2=788462&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java Thu Jun 25 18:53:16 2009
@@ -107,7 +107,9 @@
    * @throws AvroTypeException If this is a stateful writer and a
    * char-string is not expected
    */
-  public abstract void writeString(String str) throws IOException;
+  public void writeString(String str) throws IOException {
+    writeString(new Utf8(str));
+  }
 
   /**
    * Write a byte string.
@@ -130,7 +132,9 @@
    * @throws AvroTypeException If this is a stateful writer and a
    * byte-string is not expected
    */
-  public abstract void writeBytes(byte[] bytes) throws IOException;
+  public void writeBytes(byte[] bytes) throws IOException {
+    writeBytes(bytes, 0, bytes.length);
+  }
 
   /**
    * Writes a fixed size binary object.
@@ -148,7 +152,9 @@
    * A shorthand for <tt>writeFixed(bytes, 0, bytes.length)</tt>
    * @param bytes
    */
-  public abstract void writeFixed(byte[] bytes) throws IOException;
+  public void writeFixed(byte[] bytes) throws IOException {
+    writeFixed(bytes, 0, bytes.length);
+  }
   
   /**
    * Writes an enumeration.

Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java?rev=788462&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java Thu Jun 25 18:53:16 2009
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.AvroTypeException;
+
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+/**
+ * The parsing table.
+ *
+ */
+class ParsingTable {
+  /**
+   * The terminal symbols for the grammar. Symbols are also index into
+   * production table {@link #prods}. Since terminals do not have productions,
+   * we can use negative values.
+   */
+  protected static final int NULL = -1;
+  protected static final int BOOLEAN = NULL - 1;
+  protected static final int INT = BOOLEAN - 1;
+  protected static final int LONG = INT - 1;
+  protected static final int FLOAT = LONG - 1;
+  protected static final int DOUBLE = FLOAT - 1;
+  protected static final int STRING = DOUBLE - 1;
+  protected static final int BYTES = STRING - 1;
+  protected static final int FIXED = BYTES - 1;
+  protected static final int ENUM = FIXED - 1;
+  protected static final int UNION = ENUM - 1;
+
+  protected static final int ARRAYSTART = UNION - 1;
+  protected static final int ARRAYEND = ARRAYSTART - 1;
+  protected static final int MAPSTART = ARRAYEND - 1;
+  protected static final int MAPEND = MAPSTART - 1;
+
+  protected static final int LASTPRIM = MAPEND;
+
+  private static final String[] terminalNames = {
+    "", "null", "boolean", "int", "long", "float", "double", "string", "bytes",
+    "union", "array-start", "array-end", "map-start", "map-end",
+  };
+  
+  protected int root;
+
+  /**
+   * If {@link #secondPass} <tt> == false</tt> the number entries in
+   * {@link #prods} for non-terminals. Otherwise one past the last entry
+   * for non-terminals in {@link #prods}.
+   */
+  protected int nonTerminals;
+
+  /**
+   * If {@link #secondPass} <tt> == false</tt> the number entries in
+   * {@link #prods} for repeaters. Otherwise one past the last entry
+   * for repeaters in {@link #prods}.
+   */
+  protected int repeaters;
+
+  /**
+   * If {@link #secondPass} <tt> == false</tt> the number entries in
+   * {@link #prods} for unions. Otherwise one past the last entry
+   * for unions in {@link #prods}.
+   */
+  protected int unions;
+  
+  /**
+   * If {@link #secondPass} <tt> == false</tt> the number entries in
+   * {@link #prods} for fixeds. Otherwise one past the last entry
+   * for fixeds in {@link #prods}.
+   */
+  protected int fixeds;
+  
+  /**
+   * If {@link #secondPass} <tt> == false</tt> the number entries in
+   * {@link #prods} for enums. Otherwise one past the last entry
+   * for enums in {@link #prods}.
+   */
+  protected int enums;
+  
+  /**
+   * The productions for non-terminals, repeaters and unions.
+   */
+  protected int[] prods;
+
+  /**
+   * In the two-pass construction of the table, <tt>false</tt> during
+   * the first pass and <tt>true</tt> during the second pass.
+   */
+  protected boolean secondPass = false;
+
+  protected ParsingTable() { }
+
+  public ParsingTable(Schema sc) {
+    generate(sc, new HashMap<LitS, Integer>());
+
+    int total = nonTerminals + repeaters + unions + fixeds + enums;
+    prods = new int[total];
+    enums = total - enums;
+    fixeds = enums - fixeds;
+    unions = fixeds - unions;
+    repeaters = unions - repeaters;
+    nonTerminals = repeaters - nonTerminals;
+
+    secondPass = true;
+    root = generate(sc, new HashMap<LitS, Integer>());
+  }
+    
+  /**
+   * Returns the id for the non-terminal that is the start symbol
+   * for the given schema <tt>sc</tt>. If there is already an entry
+   * for the given schema in the given map <tt>seen</tt> then
+   * the id corresponding to that entry is retutuned. Otherwise
+   * a new id is generated and an entry is inserted into the map.
+   * @param sc
+   * @param seen  A map of schema to id mapping done so far.
+   * @return
+   */
+  protected final int generate(Schema sc, Map<LitS, Integer> seen) {
+    switch (sc.getType()) {
+    case NULL:
+      return NULL;
+    case BOOLEAN:
+      return BOOLEAN;
+    case INT:
+      return INT;
+    case LONG:
+      return LONG;
+    case FLOAT:
+      return FLOAT;
+    case DOUBLE:
+      return DOUBLE;
+    case STRING:
+      return STRING;
+    case BYTES:
+      return BYTES;
+    case FIXED:
+      return mkNonTerm(FIXED, mkFixed(sc.getFixedSize()));
+    case ENUM:
+      return mkNonTerm(ENUM, mkEnum(sc.getEnumSymbols().size()));
+    case ARRAY:
+      int ar_et = generate(sc.getElementType(), seen);
+      int r_et = mkRepeater(ar_et);
+      return mkNonTerm(ARRAYSTART, r_et, ARRAYEND);
+    case MAP:
+      int ar_vt = generate(sc.getValueType(), seen);
+      int r_vt = mkRepeater(STRING, ar_vt);
+      return mkNonTerm(MAPSTART, r_vt, MAPEND);
+    case RECORD:
+      LitS wsc = new LitS(sc);
+      Integer rresult = seen.get(wsc);
+      if (rresult == null) {
+        int size = sc.getFields().size();
+        rresult = allocNonTerm(size);
+        int i = rresult + size;
+        for (Field f : sc.getFields().values()) {
+          set(--i, generate(f.schema(), seen));
+        }
+        seen.put(wsc, rresult);
+      }
+      return rresult;
+
+    case UNION:
+      List<Schema> subs = sc.getTypes();
+      int u = allocUnion(subs.size());
+      for (Schema b : sc.getTypes()) {
+        set(u++, generate(b, seen));
+      }
+      return mkNonTerm(UNION, u - subs.size());
+
+    default:
+      throw new RuntimeException("Unexpected schema type");
+    }
+  }
+
+  protected final int set(int index, int value) {
+    if (secondPass) {
+      prods[index] = value;
+    }
+    return value;
+  }
+
+  /** Allocates a new non-terminal which in turn uses <tt>len</tt>
+   * new non-terminals.
+   * Each non-terminal has a unique integer id.
+   * @param len Number of non-terminals used by the freshly allocated
+   * non-terminal.
+   * @return  The id for the new non-terminal allocated.
+   */
+  protected final int allocNonTerm(int len) {
+    set(nonTerminals, len);
+    nonTerminals += (len + 1);
+    return nonTerminals - len;
+  }
+
+  protected final int mkNonTerm(int e1) {
+    int nt = nonTerminals;
+    set(nt, e1);
+    nonTerminals += 2;
+    return nt;
+  }
+
+  protected final int mkNonTerm(int e1, int e2) {
+    set(nonTerminals++, 2);
+    set(nonTerminals++, e2);
+    set(nonTerminals++, e1);
+    return nonTerminals - 2;
+  }
+
+  protected final int mkNonTerm(int e1, int e2, int e3) {
+    set(nonTerminals++, 3);
+    set(nonTerminals++, e3);
+    set(nonTerminals++, e2);
+    set(nonTerminals++, e1);
+    return nonTerminals - 3;
+  }
+
+  protected final int mkRepeater(int e1) {
+    int i = repeaters;
+    set(i++, 2);
+    set(i, i) /*recursion*/;
+    set(i + 1, e1);
+    repeaters += 3;
+    return i;
+  }
+
+  protected final int mkRepeater(int e1, int e2) {
+    int i = repeaters;
+    set(i++, 3);
+    set(i, i) /* recursion */;
+    set(i + 1, e2);
+    set(i + 2, e1);
+    repeaters += 4;
+    return i;
+  }
+
+  protected final int allocUnion(int len) {
+    set(unions, len);
+    unions += (len + 1);
+    return unions - len;
+  }
+  
+  protected final int mkFixed(int size) {
+    set(fixeds++, 1);
+    set(fixeds++, size);
+    return fixeds - 1;
+  }
+
+  protected final int mkEnum(int max) {
+    set(enums++, 1);
+    set(enums++, max);
+    return enums - 1;
+  }
+
+  public final int size(int sym) {
+    return prods[sym - 1];
+  }
+
+  /**
+   * Returns <tt>true</tt> iff the given symbol <tt>sym</tt> is terminal.
+   * @param sym The symbol that needs 
+   * @return
+   */
+  public final boolean isTerminal(int sym) {
+    return sym < 0;
+  }
+
+  public final boolean isNonTerminal(int sym) {
+    return 0 <= sym && sym < nonTerminals;
+  }
+
+  public final boolean isRepeater(int sym) {
+    return nonTerminals <= sym && sym < repeaters;
+  }
+
+  public final boolean isUnion(int sym) {
+    return repeaters <= sym && sym < unions;
+  }
+  
+  public final boolean isFixed(int sym) {
+    return unions <= sym && sym < fixeds;
+  }
+  
+  public final boolean isEnum(int sym) {
+    return fixeds <= sym && sym < enums;
+  }
+
+  public final int getBranch(int union, int unionIndex) {
+    // assert isUnion(union);
+    if (unionIndex < 0 || size(union) <= unionIndex) {
+       throw new AvroTypeException("Union index out of bounds ("
+                                   + unionIndex + ")");
+    }
+    return prods[union + unionIndex];
+  }
+
+  public final int getFixedSize(int sym) {
+    return prods[sym];
+  }
+
+  public final int getEnumMax(int sym) {
+    return prods[sym];
+  }
+
+  /** A wrapper around Schema that does "==" equality. */
+  protected static class LitS {
+    public final Schema actual;
+    public LitS(Schema actual) { this.actual = actual; }
+    
+    /**
+     * Two LitS are equal if and only if their underlying schema is
+     * the same (not merely equal).
+     */
+    public boolean equals(Object o) {
+      if (! (o instanceof LitS)) return false;
+      return actual == ((LitS)o).actual;
+    }
+    
+    public int hashCode() {
+      return actual.hashCode();
+    }
+  }
+
+  /**
+   * Returns the name for the terminal. Useful for generating diagnostic
+   * messages.
+   * @param n The terminal symbol for which the name is required.
+   * @return
+   */
+  public static String getTerminalName(int n) {
+    assert n < 0 && n >= LASTPRIM;
+    return terminalNames[-n];
+  }
+  
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    appendTo(sb, "root", root);
+    appendTo(sb, "enums", enums);
+    appendTo(sb, "fixeds", fixeds);
+    appendTo(sb, "unions", unions);
+    appendTo(sb, "repeaters", repeaters);
+    
+    appendTo(sb, prods);
+    return sb.toString();
+  }
+
+  protected static void appendTo(StringBuffer sb, int[] prods) {
+    sb.append('[');
+    for (int i = 0; i < prods.length; i++) {
+      if (i != 0) {
+        sb.append(", ");
+      }
+      sb.append(prods[i]);
+    }
+    sb.append(']');
+  }
+
+  protected static void appendTo(StringBuffer sb, String name, int value) {
+    sb.append(name);
+    sb.append(" = ");
+    sb.append(value);
+    sb.append(", ");
+  }
+
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java?rev=788462&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java Thu Jun 25 18:53:16 2009
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.IOException;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+
+/**
+ * {@link Decoder} that peforms type-resolution between the reader's and
+ * writer's schemas.
+ *
+ * <p>When resolving schemas, this class will return the values of fields in
+ * _writer's_ order, not the reader's order.  (However, it returns _only_ the
+ * reader's fields, not any extra fields the writer may have written.)  To help
+ * clients handle fields that appear to be coming out of order, this class
+ * defines the methods {@link #readFieldName} and {@link #readFieldIndex}.
+ * When called just before reading the value of a record-field, they return the
+ * name/index of the field about to be read.  See {@link #readFieldIndex} for
+ * usage details.
+ *
+ * <p>See the <a href="doc-files/parsing.html">parser documentation</a> for
+ *  information on how this works.*/
+public class ResolvingDecoder extends ValidatingDecoder {
+  private ResolvingTable rtable;
+
+  public ResolvingDecoder(Schema writer, Schema reader, Decoder in)
+    throws IOException {
+    super(new ResolvingTable(writer, reader), in);
+    rtable = (ResolvingTable) table;
+  }
+
+  /** Returns the name of the next field of the record we're reading.
+    * Similar to {@link #readFieldIndex} -- see that method for
+    * details.
+    *
+    * @throws IllegalStateExcpetion If we're not about to read a record-field
+    */
+  public String readFieldName() throws IOException {
+    int actual = advance(ResolvingTable.FIELDACTION);
+    return rtable.getFieldName(actual);
+  }
+
+  /** Returns the (zero-based) index of the next field of the record
+    * we're reading.
+    *
+    * This method is useful because {@link ResolvingDecoder}
+    * returns values in the order written by the writer, rather than
+    * the order expected by the reader.  This method allows reader's
+    * to figure out what fields to expect.  Let's say the reader is
+    * expecting a three-field record, the first field is a long, the
+    * second a string, and the third an array.  In this case, a
+    * typical usage might be as follows:
+    * <pre>
+    *   for (int i = 0; i < 3; i++) {
+    *     switch (in.readFieldIndex()) {
+    *     case 1:
+    *       foo(in.readLong());
+    *       break;
+    *     case 2:
+    *       someVariable = in.readString();
+    *       break;
+    *     case 3:
+    *       bar(in); // The code of "bar" will read an array-of-int
+    *       break;
+    *     }
+    * </pre>
+    * Note that {@link ResolvingDecoder} will return only the
+    * fields expected by the reader, not other fields that may have
+    * been written by the writer.  Thus, the iteration-count of "3" in
+    * the above loop will always be correct.
+    *
+    * Throws a runtime exception if we're not just about to read the
+    * field of a record.  Also, this method (and {@link
+    * #readFieldName}) will <i>consume</i> the field information, and
+    * thus may only be called <i>once</i> before reading the field
+    * value.  (However, if the client knows the order of incoming
+    * fields and does not need to reorder them, then the client does
+    * <i>not</i> need to call this or {@link #readFieldName}.)
+    *
+    * @throws IllegalStateExcpetion If we're not about to read a record-field
+    *                               
+    */
+  public int readFieldIndex() throws IOException {
+    int actual = advance(ResolvingTable.FIELDACTION);
+    return rtable.getFieldIndex(actual);
+  }
+
+  @Override
+  public long readLong() throws IOException {
+    int actual = advance(ResolvingTable.LONG);
+    if (actual == ResolvingTable.INT) {
+      return in.readInt();
+    } else if (actual == ResolvingTable.DOUBLE) {
+      return (long) in.readDouble();
+    } else {
+      assert actual == ResolvingTable.LONG;
+      return in.readLong();
+    }
+  }
+    
+  @Override
+  public double readDouble() throws IOException {
+    int actual = advance(ResolvingTable.DOUBLE);
+    if (actual == ResolvingTable.INT) {
+      return (double) in.readInt();
+    } else if (actual == ResolvingTable.LONG) {
+      return (double) in.readLong();
+    } else if (actual == ResolvingTable.FLOAT) {
+      return (double) in.readFloat();
+    } else {
+      assert actual == ResolvingTable.DOUBLE;
+      return in.readDouble();
+    }
+  }
+  
+  @Override
+  public int readEnum() throws IOException {
+    advance(ResolvingTable.ENUM);
+    int top = stack[--pos];
+    int n = in.readEnum();
+    if (n >= 0 && n < rtable.size(top)) {
+      n = rtable.getEnumAction(top, n);
+      if (rtable.isEnumAction(n)) {
+        return rtable.getEnumValue(n);
+      } else {
+        assert rtable.isErrorAction(n);
+        throw new AvroTypeException(rtable.getMessage(n));
+      }
+    } else {
+      throw new AvroTypeException("Enumeration out of range: " + n
+          + " max: " + rtable.size(top));
+    }
+  }
+    
+  @Override
+  public int readIndex() throws IOException {
+    advance(ParsingTable.UNION);
+    int actual = stack[--pos];
+    if (rtable.isUnion(actual)) {
+      actual = rtable.getBranch(actual, in.readInt());
+    }
+    if (rtable.isReaderUnionAction(actual)) {
+      // Both reader and writer where a union.  Based on
+      // the writer's actual branch, go get the appropriate
+      // readerUnionAction
+      stack[pos++] = rtable.getReaderUnionSym(actual);
+      return rtable.getReaderUnionIndex(actual);
+    } else {
+      throw new AvroTypeException("Unexpected index read");
+    }
+  }
+
+  @Override
+  protected int skipSymbol(int sym, int p) throws IOException {
+    if (rtable.isResolverAction(sym)) {
+      return skipSymbol(rtable.getResolverActual(sym), -1);
+    } else {
+      return super.skipSymbol(sym, p);
+    }
+  }
+
+  @Override
+  protected int advance(int input) throws IOException {
+    int top = stack[--pos];
+    while (! rtable.isTerminal(top)) {
+      if (rtable.isAction(top)) {
+        if (rtable.isFieldAction(top)) {
+          if (input == ResolvingTable.FIELDACTION) return top;
+        } else if (rtable.isResolverAction(top)) {
+          return rtable.getResolverActual(top);
+        } else if (rtable.isSkipAction(top)) {
+          skipSymbol(rtable.getProductionToSkip(top), -1);
+        } else if (rtable.isWriterUnionAction(top)) {
+          stack[pos++] = rtable.getBranch(top, in.readIndex());
+        } else if (rtable.isErrorAction(top)) {
+          throw new AvroTypeException(rtable.getMessage(top));
+        }
+      } else if (! rtable.isRepeater(top)
+                 || (input != ParsingTable.ARRAYEND
+                     && input != ParsingTable.MAPEND)) {
+        int plen = rtable.size(top);
+        if (stack.length < pos + plen) {
+          stack = expand(stack, pos + plen);
+        }
+        System.arraycopy(rtable.prods, top, stack, pos, plen);
+        pos += plen;
+      }
+      top = stack[--pos];
+    }
+    if (top == input) {
+      return top;
+    }
+    throw new AvroTypeException("Attempt to read " + input
+                                + " when a " + top + " was expected.");
+  }
+  public void reset() throws IOException {
+    while (pos > 0) {
+      if (rtable.isSkipAction(stack[pos - 1])) {
+        skipProduction(rtable.getProductionToSkip(stack[--pos]));
+      } else {
+        throw new AvroTypeException("Data not fully drained.");
+      }
+    }
+    stack[pos++] = table.root;
+  }
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java?rev=788462&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java Thu Jun 25 18:53:16 2009
@@ -0,0 +1,489 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+
+import java.util.*;
+
+/**
+ * The parsing table for "resolving" between two schemas. This class
+ * is useful for validating inputs and not outputs. There is a reader's
+ * schema and a writer's schema. The reader is interested in data according to
+ * the reader's schema. But the data itself has been written according to
+ * the writer's schema. While reading the reader gets an impression that
+ * the data is in the reader's schema (except one change mentioned below).
+ * The schema resolution process does the required transformation(s). Of course,
+ * for this to work, the two schemas must be "compatible". Though some
+ * of the incompatibilities can be statically detected by looking at the two
+ * schemas, we report the incompatibilities only when data is sought to be read.
+ * 
+ * While reading records, the fileds are returned according to the
+ * writer's schema and not the reader's. This apparently un-intuitive method
+ * is used in order to avoid caching at the ResolvingDecoder level.  
+ */
+class ResolvingTable extends ParsingTable {
+  public static final int FIELDACTION = LASTPRIM - 1;
+
+  protected int errorActions;
+  protected int readerUnionActions;
+  protected int resolverActions;
+  protected int fieldActions;
+  protected int skipActions;
+  protected int writerUnionActions;
+  protected int enumActions;
+
+  private String[] strings;
+  // Index into strings array. Used during construction.
+  private int spos;
+
+  public ResolvingTable(Schema writer, Schema reader) {
+    generate(writer, reader, new HashMap<LitS,Integer>());
+
+    int total = (errorActions + readerUnionActions + resolverActions
+                 + nonTerminals + repeaters + unions + fixeds + enums
+                 + fieldActions + skipActions + writerUnionActions
+                 + enumActions);
+
+    prods = new int[total];
+    strings = new String[errorActions + fieldActions];
+
+    enumActions = total - enumActions;
+    writerUnionActions = enumActions - writerUnionActions;
+    skipActions = writerUnionActions - skipActions;
+    fieldActions = skipActions - fieldActions;
+    resolverActions = fieldActions - resolverActions;
+    readerUnionActions = resolverActions - readerUnionActions;
+    errorActions = readerUnionActions - errorActions;
+    enums = errorActions - enums;
+    fixeds = enums - fixeds;
+    unions = fixeds - unions;
+    repeaters = unions - repeaters;
+    nonTerminals = repeaters - nonTerminals;
+    spos = 0;
+
+    secondPass = true;
+    root = generate(writer, reader, new HashMap<LitS,Integer>());
+  }
+
+  protected final int generate(Schema writer, Schema reader,
+      Map<LitS,Integer> seen) {
+    Schema.Type writerType = writer.getType();
+    Schema.Type readerType = reader.getType();
+
+    if (writerType == readerType) {
+      switch (writerType) {
+      case NULL:
+        return NULL;
+      case BOOLEAN:
+        return BOOLEAN;
+      case INT:
+        return INT;
+      case LONG:
+        return LONG;
+      case FLOAT:
+        return FLOAT;
+      case DOUBLE:
+        return DOUBLE;
+      case STRING:
+        return STRING;
+      case BYTES:
+        return BYTES;
+      }
+    }
+
+    if (writerType == Schema.Type.UNION && readerType != Schema.Type.UNION) {
+      List<Schema> branches = writer.getTypes();
+      int u = allocWriterUnionAction(branches.size());
+      for (Schema w : branches) {
+        set(u++, generate(w, reader, seen));
+      }
+      return u - branches.size();
+    }
+
+    switch (readerType) {
+    case NULL:
+    case BOOLEAN:
+    case INT:
+    case STRING:
+    case FLOAT:
+    case BYTES:
+      return mkErrorAction("Found " + writer + ", expecting " + reader);
+
+    case LONG:
+      switch (writerType) {
+      case INT:
+      case DOUBLE:
+      case FLOAT:
+        return mkResolverAction(generate(writer, seen));
+      default:
+        return mkErrorAction("Found " + writer + ", expecting " + reader);
+      }
+
+    case DOUBLE:
+      switch (writerType) {
+      case INT:
+      case LONG:
+      case FLOAT:
+        return mkResolverAction(generate(writer, seen));
+      default:
+        return mkErrorAction("Found " + writer + ", expecting " + reader);
+      }
+
+    case FIXED:
+      if (writerType != Schema.Type.FIXED
+          || (writer.getName() != null
+              && ! writer.getName().equals(reader.getName()))) {
+        return mkErrorAction("Found " + writer + ", expecting " + reader);
+      } else {
+        return (writer.getFixedSize() == reader.getFixedSize()) ?
+          mkNonTerm(FIXED, mkFixed(reader.getFixedSize())) :
+          mkErrorAction("Size mismatch in fixed field: found "
+              + writer.getFixedSize() + ", expecting " + reader.getFixedSize());
+      }
+    case ENUM:
+      if (writerType != Schema.Type.ENUM
+          || (writer.getName() != null
+              && ! writer.getName().equals(reader.getName()))) {
+        return mkErrorAction("Found " + writer + ", expecting " + reader);
+      } else {
+        int result = allocEnumAction(writer.getEnumSymbols().size());
+        int u = result;
+        for (String n : writer.getEnumSymbols()) {
+            set(u++, mkEnumAction(n, reader));
+        }
+        return mkNonTerm(ENUM, result);
+      }
+
+    case ARRAY:
+      if (writerType != Schema.Type.ARRAY) {
+        return mkErrorAction("Found " + writer + ", expecting " + reader);
+      } else {
+        int ar_et = generate(writer.getElementType(),
+                             reader.getElementType(), seen);
+        int r_et = mkRepeater(ar_et);
+        return mkNonTerm(ARRAYSTART, r_et, ARRAYEND);
+      }
+
+    case MAP:
+      if (writerType != Schema.Type.MAP) {
+        return mkErrorAction("Found " + writer + ", expecting " + reader);
+      } else {
+        int ar_vt = generate(writer.getValueType(),
+                             reader.getValueType(),
+                             seen);
+        int r_vt = mkRepeater(STRING, ar_vt);
+        return mkNonTerm(MAPSTART, r_vt, MAPEND);
+      }
+
+    case RECORD:
+      LitS wsc = new LitS2(writer, reader);
+      if (seen.get(wsc) == null) {
+        int result;
+        if (writerType != Schema.Type.RECORD
+            || (writer.getName() != null
+                && ! writer.getName().equals(reader.getName()))) {
+          result = mkErrorAction("Found " + writer + ", expecting " + reader);
+        } else {
+          outer:
+          do {
+            Map<String, Field> wfields = writer.getFields();
+            Map<String, Field> rfields = reader.getFields();
+            /*
+             * For now every field in read-record with no default value
+             * must be in write-record.
+             * Write record may have additional fields, which will be
+             * skipped during read.
+             */
+
+            boolean useDefault = false;
+            int rsize = 0;
+            for (Map.Entry<String, Field> e : rfields.entrySet()) {
+              Field f = wfields.get(e.getKey());
+              if (f == null) {
+                Field wf = e.getValue();
+                if (wf.defaultValue() == null) {
+                  result = mkErrorAction("Found " + writer + ", expecting " + reader);
+                  break outer;
+                } else {
+                  useDefault = true;
+                }
+              } else {
+                rsize++;
+              }
+            }
+            int size = 2 * rsize + (wfields.size() - rsize);
+            if (useDefault) {
+              rsize++;
+            }
+            result = allocNonTerm(size);
+            int i = result + size;
+            for (Map.Entry<String, Field> wf : wfields.entrySet()) {
+              String fname = wf.getKey();
+              Field rf = rfields.get(fname);
+              if (rf == null) {
+                set(--i, mkSkipAction(generate(wf.getValue().schema(), seen)));
+              } else {
+                set(--i, mkFieldAction(rf.pos(), fname));
+                set(--i, generate(wf.getValue().schema(), rf.schema(), seen));
+              }
+            }
+            /*
+             *  Insert a "special" field action to indicate that there are
+             *  no more fields for this record, but some reader fields
+             *  need to be filled with default values.
+             */
+            if (useDefault) {
+              set(--i, mkFieldAction(-1, null));
+            }
+          } while (false);
+        }
+        seen.put(wsc, result);
+      }
+      return seen.get(wsc);
+
+    case UNION:
+      if (writerType != Schema.Type.UNION) { // Only reader is union
+        return mkNonTerm(UNION, mkReaderUnionAction(writer, reader, seen));
+      } else { // Both reader and writer are unions
+        int result = allocUnion(writer.getTypes().size());
+        int u = result;
+        for (Schema w : writer.getTypes()) {
+          set(u++, mkReaderUnionAction(w, reader, seen));
+        }
+        return mkNonTerm(UNION, result);
+      }
+
+    default:
+      throw new RuntimeException("Unexpected schema type: " + readerType);
+    }
+  }
+
+  protected int mkString(String s) {
+    if (secondPass) {
+      strings[spos] = s;
+    }
+    return spos++;
+  }
+
+  private final int mkErrorAction(String message) {
+    set(errorActions, mkString(message));
+    return errorActions++;
+  }
+
+  private final int mkReaderUnionAction(Schema w, Schema r, Map<LitS, Integer> s) {
+    int j = bestBranch(r, w);
+    if (j < 0) {
+      return mkErrorAction("Found " + w + ", expecting " + r);
+    } else {
+      int result = readerUnionActions;
+      readerUnionActions += 2;
+      set(result, j);
+      set(result + 1, generate(w, r.getTypes().get(j), s));
+      return result;
+    }
+  }
+
+  private int bestBranch(Schema r, Schema w) {
+	  Schema.Type vt = w.getType();
+      // first scan for exact match
+      int j = 0;
+      for (Schema b : r.getTypes()) {
+        if (vt == b.getType())
+          if (vt == Type.RECORD) {
+            String vname = w.getName();
+            if (vname == null || vname.equals(b.getName()))
+              return j;
+          } else
+            return j;
+        j++;
+      }
+
+      // then scan match via numeric promotion
+      j = 0;
+      for (Schema b : r.getTypes()) {
+        switch (vt) {
+        case INT:
+          switch (b.getType()) {
+          case LONG: case DOUBLE:
+            return j;
+          }
+          break;
+        case LONG:
+        case FLOAT:
+          switch (b.getType()) {
+          case DOUBLE:
+            return j;
+          }
+          break;
+        }
+        j++;
+      }
+      return -1;
+  }
+
+private final int mkResolverAction(int actual) {
+    set(resolverActions, actual);
+    return resolverActions++;
+  }
+
+  private final int mkFieldAction(int index, String name) {
+    int result = fieldActions;
+    fieldActions += 2;
+    set(result, index);
+    set(result + 1, mkString(name));
+    return result;
+  }
+
+  private final int mkSkipAction(int toSkip) {
+    set(skipActions, toSkip);
+    return skipActions++;
+  }
+
+  private final int allocWriterUnionAction(int len) {
+    set(writerUnionActions, len);
+    writerUnionActions += (len + 1);
+    return writerUnionActions - len;
+  }
+  
+  private final int allocEnumAction(int len) {
+    set(enumActions, len);
+    enumActions += (len + 1);
+    return enumActions - len;
+  }
+
+  private final int mkEnumAction(String n, Schema r) {
+    int k = r.getEnumOrdinal(n);
+    if (k < 0) {
+      return mkErrorAction("Unknown enum: " + n);
+    } else {
+      set(enumActions++, 1);
+      set(enumActions++, k);
+      return enumActions - 1;
+    }
+  }
+  
+  public final boolean isAction(int sym) {
+    return enums <= sym && sym < enumActions;
+  }
+  
+  public final boolean isErrorAction(int sym) {
+    return enums <= sym && sym < errorActions;
+  }
+
+  public final boolean isReaderUnionAction(int sym) {
+    return errorActions <= sym && sym < readerUnionActions;
+  }
+
+  public final boolean isResolverAction(int sym) {
+    return readerUnionActions <= sym && sym < resolverActions;
+  }
+
+  public final boolean isFieldAction(int sym) {
+    return resolverActions <= sym && sym < fieldActions;
+  }
+
+  public final boolean isSkipAction(int sym) {
+    return fieldActions <= sym && sym < skipActions;
+  }
+
+  public final boolean isWriterUnionAction(int sym) {
+    return skipActions <= sym && sym < writerUnionActions;
+  }
+
+  public final boolean isEnumAction(int sym) {
+    return writerUnionActions <= sym && sym < enumActions;
+  }
+
+  public final String getMessage(int errorAction) {
+    return strings[prods[errorAction]];
+  }
+
+  public final int getResolverActual(int resolverAction) {
+    return prods[resolverAction];
+  }
+
+  public final String getFieldName(int fieldAction) {
+    return strings[prods[fieldAction + 1]];
+  }
+
+  public final int getFieldIndex(int fieldAction) {
+    return prods[fieldAction];
+  }
+
+  public final int getProductionToSkip(int skipAction) {
+    return prods[skipAction];
+  }
+
+  public final int getReaderUnionIndex(int readerUnionAction) {
+    return prods[readerUnionAction];
+  }
+
+  public final int getReaderUnionSym(int readerUnionAction) {
+    return prods[readerUnionAction + 1];
+  }
+
+  public final int getEnumAction(int ntsym, int e) {
+    return prods[ntsym + e];
+  }
+
+  public final int getEnumValue(int enumAction) {
+    return prods[enumAction];
+  }
+
+  /** Clever trick which differentiates items put into
+    * <code>seen</code> by {@link #count(Schema,Map<LitS,Integer>)}
+    * from those put in by {@link
+    * #count(Schema,Schema,Map<LitS,Integer>)}. */
+  protected static class LitS2 extends LitS {
+    public Schema expected;
+    public LitS2(Schema actual, Schema expected) {
+      super(actual);
+      this.expected = expected;
+    }
+    public boolean equals(Object o) {
+      if (! (o instanceof LitS2)) return false;
+      LitS2 other = (LitS2)o;
+      return actual == other.actual && expected == other.expected;
+    }
+    public int hashCode() {
+      return super.hashCode() + expected.hashCode();
+    }
+  }
+  
+  public String toString() {
+    StringBuffer sb = new StringBuffer();
+    appendTo(sb, "root", root);
+    appendTo(sb, "errorActions", errorActions);
+    appendTo(sb, "readerUnionActions", readerUnionActions);
+    appendTo(sb, "resolverActions", resolverActions);
+    appendTo(sb, "nonTerminals", nonTerminals);
+    appendTo(sb, "unions", unions);
+    appendTo(sb, "repeaters", repeaters);
+    appendTo(sb, "fieldActions", fieldActions);
+    appendTo(sb, "skipActions", skipActions);
+    appendTo(sb, "writerUnionActions", writerUnionActions);
+    appendTo(sb, "enumActions", enumActions);
+    
+    appendTo(sb, prods);
+    return sb.toString();
+  }
+
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java?rev=788462&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java Thu Jun 25 18:53:16 2009
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+
+/** {@link Decoder} that ensures that the sequence of operations conforms
+ *  to a schema.
+ * <p>See the <a href="doc-files/parsing.html">parser documentation</a> for
+ *  information on how this works.
+ */
+public class ValidatingDecoder extends Decoder {
+  protected final Decoder in;
+  protected int[] stack;
+  protected int pos;
+  ParsingTable table;
+
+  ValidatingDecoder(ParsingTable table, Decoder in)
+    throws IOException {
+    this.in = in;
+    this.stack = new int[5]; // Start small to make sure expansion code works
+    this.pos = 0;
+    this.table = table;
+    reset();
+  }
+
+  public ValidatingDecoder(Schema schema, Decoder in)
+    throws IOException {
+    this(new ParsingTable(schema), in);
+  }
+
+  @Override
+  public void init(InputStream in) {
+    this.in.init(in);
+  }
+
+  @Override
+  public void readNull() throws IOException {
+    advance(ParsingTable.NULL);
+    in.readNull();
+  }
+    
+  @Override
+  public boolean readBoolean() throws IOException {
+    advance(ParsingTable.BOOLEAN);
+    return in.readBoolean();
+  }
+
+  @Override
+  public int readInt() throws IOException {
+    advance(ParsingTable.INT);
+    return in.readInt();
+  }
+    
+  @Override
+  public long readLong() throws IOException {
+    advance(ParsingTable.LONG);
+    return in.readLong();
+  }
+
+  @Override
+  public float readFloat() throws IOException {
+    advance(ParsingTable.FLOAT);
+    return in.readFloat();
+  }
+
+  @Override
+  public double readDouble() throws IOException {
+    advance(ParsingTable.DOUBLE);
+    return in.readDouble();
+  }
+    
+  @Override
+  public Utf8 readString(Utf8 old) throws IOException {
+    advance(ParsingTable.STRING);
+    return in.readString(old);
+  }
+
+  @Override
+  public void skipString() throws IOException {
+    advance(ParsingTable.STRING);
+    in.skipString();
+  }
+
+  @Override
+  public ByteBuffer readBytes(ByteBuffer old) throws IOException {
+    advance(ParsingTable.BYTES);
+    return in.readBytes(old);
+  }
+
+  @Override
+  public void skipBytes() throws IOException {
+    advance(ParsingTable.BYTES);
+    in.skipBytes();
+  }
+
+  @Override
+  public void readFixed(byte[] bytes, int start, int length) throws IOException {
+    advance(ParsingTable.FIXED);
+    int top = stack[--pos];
+    assert table.isFixed(top);
+    if (table.getFixedSize(top) != length) {
+      throw new AvroTypeException(
+        "Incorrect length for fiexed binary: expected " +
+        table.getFixedSize(top) + " but received " + length + " bytes.");
+    }
+    in.readFixed(bytes, start, length);
+  }
+
+  @Override
+  public void skipFixed(int length) throws IOException {
+    advance(ParsingTable.FIXED);
+    int top = stack[--pos];
+    assert table.isFixed(top);
+    if (table.getFixedSize(top) != length) {
+      throw new AvroTypeException(
+        "Incorrect length for fiexed binary: expected " +
+        table.getFixedSize(top) + " but received " + length + " bytes.");
+    }
+    in.skipFixed(length);
+  }
+
+  @Override
+  public int readEnum() throws IOException {
+    advance(ParsingTable.ENUM);
+    int top = stack[--pos];
+    assert table.isEnum(top);
+    int result = in.readEnum();
+    if (result < 0 || result >= table.getEnumMax(top)) {
+      throw new AvroTypeException(
+          "Enumeration out of range: max is " +
+          table.getEnumMax(top) + " but received " + result);
+    }
+    return result;
+  }
+
+  @Override
+  public long readArrayStart() throws IOException {
+    advance(ParsingTable.ARRAYSTART);
+    long result = in.readArrayStart();
+    if (result == 0) {
+      advance(ParsingTable.ARRAYEND);
+    }
+    return result;
+  }
+
+  @Override
+  public long arrayNext() throws IOException {
+    long result = in.readLong();
+    if (result == 0) {
+      advance(ParsingTable.ARRAYEND);
+    }
+    return result;
+  }
+
+  @Override
+  public long skipArray() throws IOException {
+    advance(ParsingTable.ARRAYSTART);
+    skipFullArray(stack[--pos]);
+    advance(ParsingTable.ARRAYEND);
+    return 0;
+  }
+
+  @Override
+  public long readMapStart() throws IOException {
+    advance(ParsingTable.MAPSTART);
+    long result = in.readMapStart();
+    if (result == 0) {
+      advance(ParsingTable.MAPEND);
+    }
+    return result;
+  }
+
+  @Override
+  public long mapNext() throws IOException {
+    long result = in.mapNext();
+    if (result == 0) {
+      advance(ParsingTable.MAPEND);
+    }
+    return result;
+  }
+
+  @Override
+  public long skipMap() throws IOException {
+    advance(ParsingTable.MAPSTART);
+    skipFullMap(stack[--pos]);
+    advance(ParsingTable.MAPEND);
+    return 0;
+  }
+
+  @Override
+  public int readIndex() throws IOException {
+    advance(ParsingTable.UNION);
+    int top = stack[--pos];
+    assert table.isUnion(top);
+    int result = in.readIndex();
+    stack[pos++] = table.getBranch(top, result);
+    return result;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    /*
+     * See the design note for ValidatingValueWriter.reset() on why such
+     * a method is needed.
+     */
+    while (pos > 0) {
+      skipSymbol(stack[pos], -1);
+    }
+    stack[pos++] = table.root;
+  }
+
+  /** Skip the values described by a production. */
+  protected void skipProduction(int ntsym) throws IOException {
+    if (! table.isNonTerminal(ntsym) && ! table.isRepeater(ntsym)) {
+      throw new IllegalArgumentException("Can't skip a " + ntsym);
+    }
+    for (int i = table.size(ntsym) - 1; i >= 0; i--) {
+      int sym = table.prods[ntsym + i];
+      if (table.isRepeater(sym))
+        continue; // Don't recurse -- our caller will do that for us
+
+      i = skipSymbol(table.prods[ntsym + i], ntsym + i) - ntsym;
+    }
+  }
+
+  protected int skipSymbol(int sym, int p) throws IOException {
+    switch (sym) {
+    case ParsingTable.NULL:
+      in.readNull();
+      break;
+    case ParsingTable.BOOLEAN:
+      in.readBoolean();
+      break;
+    case ParsingTable.INT:
+      in.readInt();
+      break;
+    case ParsingTable.LONG:
+      in.readLong();
+      break;
+    case ParsingTable.FLOAT:
+      in.readFloat();
+      break;
+    case ParsingTable.DOUBLE:
+      in.readDouble();
+      break;
+    case ParsingTable.STRING:
+      in.skipString();
+      break;
+    case ParsingTable.BYTES:
+      in.skipBytes();
+      break;
+    case ParsingTable.FIXED:
+      in.skipFixed(table.getFixedSize(table.prods[--p]));
+      break;
+    case ParsingTable.UNION:
+      skipSymbol(table.getBranch(table.prods[--p], in.readIndex()), -1);
+      break;
+    case ParsingTable.ARRAYSTART:
+      while (p-- >= 0) {
+        int element = table.prods[p];
+        if (table.isRepeater(element)) {
+          skipFullArray(element);
+          break;
+        }
+      }
+      while (table.prods[p] != ParsingTable.ARRAYEND) {
+        p--; // (skip action syms)
+      }
+      break;
+    case ParsingTable.MAPSTART:
+      while (p-- >= 0) {
+        int element = table.prods[p];
+        if (table.isRepeater(element)) {
+          skipFullMap(element);
+          break;
+        }
+      }
+      while (table.prods[p] != ParsingTable.MAPEND) {
+        p--; // (skip action syms)
+      }
+      break;
+    default:  // record
+      skipProduction(sym);
+    }
+    return p;
+  }
+
+  private final void skipFullMap(int element) throws IOException {
+    for (long c = in.skipMap(); c != 0; c = in.skipMap()) {
+      skipElements(element, c);
+    }
+  }
+
+  private final void skipElements(int element, long count) throws IOException {
+    while (count-- > 0) {
+      skipProduction(element);
+    }
+  }
+
+  private final void skipFullArray(int element) throws IOException {
+    for (long c = in.skipArray(); c != 0; c = in.skipArray()) {
+      skipElements(element, c);
+    }
+  }
+
+  protected int advance(int input) throws IOException {
+    int top = stack[--pos];
+    while (! table.isTerminal(top)) {
+      if (! table.isRepeater(top)
+          || (input != ParsingTable.ARRAYEND && input != ParsingTable.MAPEND)) {
+        int plen = table.size(top);
+        if (stack.length < pos + plen) {
+          stack = expand(stack, pos + plen);
+        }
+        System.arraycopy(table.prods, top, stack, pos, plen);
+        pos += plen;
+      }
+      top = stack[--pos];
+    }
+    assert table.isTerminal(top);
+    if (top != input) {
+      throw new AvroTypeException("Attempt to read a "
+          + ParsingTable.getTerminalName(input) + " when a "
+          + ParsingTable.getTerminalName(top) + " was expected.");
+    }
+    return top;
+  }
+
+  protected static int[] expand(int[] stack, int len) {
+    while (stack.length < len) {
+      stack = Arrays.copyOf(stack, stack.length + Math.max(stack.length, 1024));
+    }
+    return stack;
+  }
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java?rev=788462&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java Thu Jun 25 18:53:16 2009
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.avro.AvroTypeException;
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+
+/** {@link Encoder} that ensures that the sequence of operations conforms
+ * to a schema.
+ *
+ * <p>See the <a href="doc-files/parsing.html">parser documentation</a> for
+ *  information on how this works.*/
+public class ValidatingEncoder extends Encoder {
+  private final Encoder out;
+  protected int[] stack;
+  protected int pos;
+  protected ParsingTable table;
+
+  public ValidatingEncoder(Schema schema, Encoder out) {
+    this.out = out;
+    this.stack = new int[5]; // Start small to make sure expansion code works
+    this.pos = 0;
+    this.table = new ParsingTable(schema);
+    stack[pos++] = table.root;
+  }
+
+  @Override
+  public void init(OutputStream out) throws IOException {
+    flush();
+    this.out.init(out);
+  }
+
+  @Override
+  public void flush() throws IOException {
+    out.flush();
+  }
+
+  @Override
+  public void writeNull() throws IOException {
+    advance(ParsingTable.NULL);
+    out.writeNull();
+  }
+    
+  @Override
+  public void writeBoolean(boolean b) throws IOException {
+    advance(ParsingTable.BOOLEAN);
+    out.writeBoolean(b);
+  }
+
+  @Override
+  public void writeInt(int n) throws IOException {
+    advance(ParsingTable.INT);
+    out.writeLong(n);
+  }
+
+  @Override
+  public void writeLong(long n) throws IOException {
+    advance(ParsingTable.LONG);
+    out.writeLong(n);
+  }
+    
+  @Override
+  public void writeFloat(float f) throws IOException {
+    advance(ParsingTable.FLOAT);
+    out.writeFloat(f);
+  }
+
+  @Override
+  public void writeDouble(double d) throws IOException {
+    advance(ParsingTable.DOUBLE);
+    out.writeDouble(d);
+  }
+
+  @Override
+  public void writeString(Utf8 utf8) throws IOException {
+    advance(ParsingTable.STRING);
+    out.writeString(utf8);
+  }
+
+  @Override
+  public void writeBytes(ByteBuffer bytes) throws IOException {
+    advance(ParsingTable.BYTES);
+    out.writeBytes(bytes);
+  }
+
+  @Override
+  public void writeBytes(byte[] bytes, int start, int len) throws IOException {
+    advance(ParsingTable.BYTES);
+    out.writeBytes(bytes, start, len);
+  }
+
+  @Override
+  public void writeFixed(byte[] bytes, int start, int length)
+    throws IOException {
+    advance(ParsingTable.FIXED);
+    int top = stack[--pos];
+    assert table.isFixed(top);
+    if (table.getFixedSize(top) != length) {
+      throw new AvroTypeException(
+        "Incorrect length for fiexed binary: expected " +
+        table.getFixedSize(top) + " but received " + length + " bytes.");
+    }
+    out.writeFixed(bytes, start, length);
+  }
+
+  @Override
+  public void writeEnum(int e) throws IOException {
+    advance(ParsingTable.ENUM);
+    int top = stack[--pos];
+    assert table.isEnum(top);
+    if (e < 0 || e >= table.getEnumMax(top)) {
+      throw new AvroTypeException(
+        "Enumeration out of range: max is " +
+        table.getEnumMax(top) + " but received " + e);
+    }
+    out.writeEnum(e);
+  }
+
+  @Override
+  public void writeArrayStart() throws IOException {
+    advance(ParsingTable.ARRAYSTART);
+    out.writeArrayStart();
+  }
+
+  @Override
+  public void setItemCount(long itemCount) throws IOException {
+    out.setItemCount(itemCount);
+  }
+
+  @Override
+  public void startItem() throws IOException {
+    out.startItem();
+  }
+
+  @Override
+  public void writeArrayEnd() throws IOException {
+    advance(ParsingTable.ARRAYEND);
+    out.writeArrayEnd();
+  }
+
+  @Override
+  public void writeMapStart() throws IOException {
+    advance(ParsingTable.MAPSTART);
+    out.writeMapStart();
+  }
+
+  @Override
+  public void writeMapEnd() throws IOException {
+    advance(ParsingTable.MAPEND);
+    out.writeMapEnd();
+  }
+
+  @Override
+  public void writeIndex(int unionIndex) throws IOException {
+    advance(ParsingTable.UNION);
+    int top = stack[--pos];
+    assert table.isUnion(top);
+    stack[pos++] = table.getBranch(top, unionIndex);
+    out.writeIndex(unionIndex);
+  }
+
+  private int advance(int input) {
+    int top = stack[--pos];
+    while (! table.isTerminal(top)) {
+      if (! table.isRepeater(top)
+          || (input != ParsingTable.ARRAYEND && input != ParsingTable.MAPEND)) {
+        int plen = table.size(top);
+        if (stack.length < pos + plen) {
+          stack = ValidatingDecoder.expand(stack, pos + plen);
+        }
+        System.arraycopy(table.prods, top, stack, pos, plen);
+        pos += plen;
+      }
+      top = stack[--pos];
+    }
+    assert table.isTerminal(top);
+    if (top != input) {
+      throw new AvroTypeException("Attempt to write a " +
+          ParsingTable.getTerminalName(input) + " when a "
+          + ParsingTable.getTerminalName(top) + " was expected.");
+    }
+    return top;
+  }
+
+  /**
+   * After writing a complete object that conforms to the schema or after an
+   * error, if you want to start writing another
+   * object, call this method.
+   */
+  public void reset() {
+    /*
+     * Design note: Why is this a separate method? Why can't we auto reset
+     * in advance() when the stack becomes empty which is an indication that
+     * an object has been completed?
+     * 
+     * (1) advance() is called in an inner loop. Saving cycles there is worthy.
+     * (2) The client may actually be writing something beyond the current
+     * object and the writer will think it has started writing the next object.
+     * This may lead to certain errors not being caught.
+     * (3) We need reset() anyway to take the writer out of any error state.
+     */
+    pos = 0;
+    stack[pos++] = table.root;
+  }
+}

Propchange: hadoop/avro/trunk/src/java/org/apache/avro/io/doc-files/
------------------------------------------------------------------------------
    svn:mergeinfo = 



Mime
View raw message