avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r788481 - /hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
Date Thu, 25 Jun 2009 19:33:12 GMT
Author: cutting
Date: Thu Jun 25 19:33:11 2009
New Revision: 788481

URL: http://svn.apache.org/viewvc?rev=788481&view=rev
Log:
AVRO-29. Reverting changes to GenericDatumReader.

Modified:
    hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java

Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=788481&r1=788480&r2=788481&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java Thu Jun 25
19:33:11 2009
@@ -18,57 +18,59 @@
 package org.apache.avro.generic;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
+import java.nio.ByteBuffer;
+
+import org.codehaus.jackson.JsonNode;
 
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.AvroTypeException;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
 import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.ResolvingDecoder;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.util.Utf8;
-import org.codehaus.jackson.JsonNode;
 
 /** {@link DatumReader} for generic Java objects. */
 public class GenericDatumReader<D> implements DatumReader<D> {
-  private Schema writerSchema;
-  private Schema readerSchema;
-  
-  public GenericDatumReader() { }
+  private Schema actual;
+  private Schema expected;
 
-  public GenericDatumReader(Schema schema) {
-    this.readerSchema = schema;
-  }
+  public GenericDatumReader() {}
 
-  public GenericDatumReader(Schema writerSchema, Schema readerSchema) {
-    this.writerSchema = writerSchema;
-    this.readerSchema = readerSchema;
+  public GenericDatumReader(Schema actual) {
+    setSchema(actual);
   }
 
-  public void setSchema(Schema schema) {
-    this.readerSchema = schema;
+  public GenericDatumReader(Schema actual, Schema expected) {
+    this(actual);
+    this.expected = expected;
   }
 
+  public void setSchema(Schema actual) { this.actual = actual; }
+
   @SuppressWarnings("unchecked")
   public D read(D reuse, Decoder in) throws IOException {
-    return (D) read(reuse, readerSchema,
-        writerSchema == null ? in :
-          new ResolvingDecoder(writerSchema, readerSchema, in));
+    return (D) read(reuse, actual, expected != null ? expected : actual, in);
   }
   
   /** Called to read data.*/
-  protected Object read(Object old, Schema schema, Decoder in)
-  	throws IOException {
-    switch (schema.getType()) {
-    case RECORD:  return readRecord(old, schema, in);
-    case ENUM:    return readEnum(schema, in);
-    case ARRAY:   return readArray(old, schema, in);
-    case MAP:     return readMap(old, schema, in);
-    case FIXED:   return readFixed(old, schema, in);
+  protected Object read(Object old, Schema actual,
+                        Schema expected, Decoder in) throws IOException {
+    if (actual.getType() == Type.UNION)           // resolve unions
+      actual = actual.getTypes().get((int)in.readIndex());
+    if (expected.getType() == Type.UNION)
+      expected = resolveExpected(actual, expected);
+    switch (actual.getType()) {
+    case RECORD:  return readRecord(old, actual, expected, in);
+    case ENUM:    return readEnum(actual, expected, in);
+    case ARRAY:   return readArray(old, actual, expected, in);
+    case MAP:     return readMap(old, actual, expected, in);
+    case FIXED:   return readFixed(old, actual, expected, in);
     case STRING:  return readString(old, in);
     case BYTES:   return readBytes(old, in);
     case INT:     return in.readInt();
@@ -77,62 +79,90 @@
     case DOUBLE:  return in.readDouble();
     case BOOLEAN: return in.readBoolean();
     case NULL:    return null;
-    case UNION:   return readUnion(old, schema, in);
-    default: throw new AvroRuntimeException("Unknown type: " + schema +
-        " " + schema.getType());
+    default: throw new AvroRuntimeException("Unknown type: "+actual);
     }
   }
 
+  private Schema resolveExpected(Schema actual, Schema expected) {
+    // first scan for exact match
+    for (Schema branch : expected.getTypes())
+      if (branch.getType() == actual.getType())
+        switch (branch.getType()) {
+        case RECORD:
+        case FIXED:
+          String name = branch.getName();
+          if (name == null || name.equals(actual.getName()))
+            return branch;
+          break;
+        default:
+          return branch;
+        }
+    // then scan match via numeric promotion
+    for (Schema branch : expected.getTypes())
+      switch (actual.getType()) {
+      case INT:
+        switch (expected.getType()) {
+        case LONG: case FLOAT: case DOUBLE:
+          return expected;
+        }
+        break;
+      case LONG:
+        switch (expected.getType()) {
+        case FLOAT: case DOUBLE:
+          return expected;
+        }
+        break;
+      case FLOAT:
+        switch (expected.getType()) {
+        case DOUBLE:
+          return expected;
+        }
+        break;
+      }
+    throw new AvroTypeException("Expected "+expected+", found "+actual);
+  }
+
   /** Called to read a record instance. May be overridden for alternate record
    * representations.*/
-  protected Object readRecord(Object old, Schema schema,
+  protected Object readRecord(Object old, Schema actual, Schema expected,
                               Decoder in) throws IOException {
-    if (in instanceof ResolvingDecoder) {
-      return readRecord(old, schema, (ResolvingDecoder) in);
-    }
-    Object record = newRecord(old, schema);
+    /* TODO: We may want to compute the expected and actual mapping and cache
+     * the mapping (keyed by <actual, expected>). */
+    String recordName = expected.getName();
+    if (recordName != null && !recordName.equals(actual.getName()))
+      throw new AvroTypeException("Expected "+expected+", found "+actual);
+    Map<String, Field> expectedFields = expected.getFields();
+    // all fields not in expected should be removed by newRecord.
+    Object record = newRecord(old, expected);
     int size = 0;
-    for (Map.Entry<String, Field> entry : schema.getFields().entrySet()) {
+    for (Map.Entry<String, Field> entry : actual.getFields().entrySet()) {
       String fieldName = entry.getKey();
-      Field field = entry.getValue();
-      int fieldPosition = field.pos();
+      Field actualField = entry.getValue();
+      Field expectedField =
+          expected == actual ? actualField : expectedFields.get(entry.getKey());
+      if (expectedField == null) {
+        skip(actualField.schema(), in);
+        continue;
+      }
+      int fieldPosition = expectedField.pos();
       Object oldDatum =
           (old != null) ? getField(record, fieldName, fieldPosition) : null;
       addField(record, fieldName, fieldPosition,
-               read(oldDatum, field.schema(), in));
+               read(oldDatum,actualField.schema(),expectedField.schema(), in));
       size++;
     }
-    return record;
-  }
-  
-  protected Object readRecord(Object old, Schema schema,
-      ResolvingDecoder in) throws IOException {
-    Object record = newRecord(old, schema);
-    Map<String, Field> readerFields = schema.getFields();
-
-    BitSet bs = new BitSet();
-    for (int i = 0; i < readerFields.size(); i++) {
-      String fn = in.readFieldName();
-      if (fn == null) {
-        break;
-      }
-      Field f = readerFields.get(fn);
-      int fp = f.pos();
-      bs.set(fp);
-      Object oldDatum =
-        (old != null) ? getField(record, fn, fp) : null;
-      addField(record, fn, fp, read(oldDatum, f.schema(), in));
-    }
-    for (Map.Entry<String, Field> entry : readerFields.entrySet()) {
-      Field f = entry.getValue();
-      if (! bs.get(f.pos())) {
+    if (expectedFields.size() > size) {           // not all fields set
+      Set<String> actualFields = actual.getFields().keySet();
+      for (Map.Entry<String, Field> entry : expectedFields.entrySet()) {
         String fieldName = entry.getKey();
-        JsonNode json = f.defaultValue();
-        if (json != null) {                     // has default
-          addField(record, fieldName, f.pos(),  // add default
-                   defaultFieldValue(old, f.schema(), json));
-        } else if (old != null) {               // remove stale value
-          removeField(record, fieldName, entry.getValue().pos());
+        if (!actualFields.contains(fieldName)) {  // an unset field
+          Field f = entry.getValue();
+          JsonNode json = f.defaultValue();
+          if (json != null)                       // has default
+            addField(record, fieldName, f.pos(),  // add default
+                     defaultFieldValue(old, f.schema(), json));
+          else if (old != null)                   // remove stale value
+            removeField(record, fieldName, entry.getValue().pos());
         }
       }
     }
@@ -207,16 +237,18 @@
     case DOUBLE:  return json.getDoubleValue();
     case BOOLEAN: return json.getBooleanValue();
     case NULL:    return null;
-    default: throw new AvroRuntimeException("Unknown type: " + schema);
+    default: throw new AvroRuntimeException("Unknown type: "+actual);
     }
   }
 
   /** Called to read an enum value. May be overridden for alternate enum
    * representations.  By default, returns the symbol as a String. */
-  protected Object readEnum(Schema schema, Decoder in)
+  protected Object readEnum(Schema actual, Schema expected, Decoder in)
     throws IOException {
-    return createEnum(schema.getEnumSymbols().get(in.readEnum()),
-        schema);
+    String name = expected.getName();
+    if (name != null && !name.equals(actual.getName()))
+      throw new AvroTypeException("Expected "+expected+", found "+actual);
+    return createEnum(actual.getEnumSymbols().get(in.readEnum()), expected);
   }
 
   /** Called to create an enum value. May be overridden for alternate enum
@@ -225,18 +257,19 @@
 
   /** Called to read an array instance.  May be overridden for alternate array
    * representations.*/
-  protected Object readArray(Object old, Schema schema,
+  protected Object readArray(Object old, Schema actual, Schema expected,
                              Decoder in) throws IOException {
-    Schema type = schema.getElementType();
+    Schema actualType = actual.getElementType();
+    Schema expectedType = expected.getElementType();
     long l = in.readArrayStart();
     if (l > 0) {
       Object array = newArray(old, (int) l);
       do {
         for (long i = 0; i < l; i++) {
-          addToArray(array,
-              read(peekArray(array), type, in));  
+          addToArray(array, read(peekArray(array), actualType, expectedType, in));  
         }
       } while ((l = in.arrayNext()) > 0);
+      
       return array;
     } else {
       return newArray(old, 0);
@@ -260,9 +293,10 @@
   
   /** Called to read a map instance.  May be overridden for alternate map
    * representations.*/
-  protected Object readMap(Object old, Schema schema,
+  protected Object readMap(Object old, Schema actual, Schema expected,
                            Decoder in) throws IOException {
-    Schema valueType = schema.getValueType();
+    Schema aValue = actual.getValueType();
+    Schema eValue = expected.getValueType();
     long l = in.readMapStart();
     Object map = newMap(old, (int) l);
     if (l > 0) {
@@ -270,7 +304,7 @@
         for (int i = 0; i < l; i++) {
           addToMap(map,
               readString(null, in),
-              read(null, valueType, in));
+              read(null, aValue, eValue, in));
         }
       } while ((l = in.mapNext()) > 0);
     }
@@ -286,11 +320,13 @@
   
   /** Called to read a fixed value. May be overridden for alternate fixed
    * representations.  By default, returns {@link GenericFixed}. */
-  protected Object readFixed(Object old, Schema schema,
+  protected Object readFixed(Object old, Schema actual, Schema expected,
                              Decoder in)
     throws IOException {
-    GenericFixed fixed = (GenericFixed) createFixed(old, schema);
-    in.readFixed(fixed.bytes(), 0, schema.getFixedSize());
+    if (!actual.equals(expected))
+      throw new AvroTypeException("Expected "+expected+", found "+actual);
+    GenericFixed fixed = (GenericFixed)createFixed(old, expected);
+    in.readFixed(fixed.bytes(), 0, actual.getFixedSize());
     return fixed;
   }
 
@@ -310,14 +346,6 @@
     System.arraycopy(bytes, 0, fixed.bytes(), 0, schema.getFixedSize());
     return fixed;
   }
-
-  private Object readUnion(Object old, Schema schema, Decoder in)
-    throws IOException {
-    int idx = in.readIndex();
-    Schema s = schema.getTypes().get(idx);
-    return read(old, s, in);
-  }
-
   /**
    * Called to create new record instances. Subclasses may override to use a
    * different record implementation. The returned instance must conform to the
@@ -380,4 +408,54 @@
    * override to use a different byte array representation.  By default, this
    * calls {@link ByteBuffer#wrap(byte[])}.*/
   protected Object createBytes(byte[] value) { return ByteBuffer.wrap(value); }
+
+  private static final Schema STRING_SCHEMA = Schema.create(Type.STRING);
+
+  /** Skip an instance of a schema. */
+  public static void skip(Schema schema, Decoder in) throws IOException {
+    switch (schema.getType()) {
+    case RECORD:
+      for (Map.Entry<String, Schema> entry : schema.getFieldSchemas())
+        skip(entry.getValue(), in);
+      break;
+    case ENUM:
+      in.readInt();
+      break;
+    case ARRAY:
+      Schema elementType = schema.getElementType();
+      for (long l = in.skipArray(); l > 0; l = in.skipArray()) {
+        for (long i = 0; i < l; i++) {
+          skip(elementType, in);
+        }
+      }
+      break;
+    case MAP:
+      Schema value = schema.getValueType();
+      for (long l = in.skipMap(); l > 0; l = in.skipMap()) {
+        for (long i = 0; i < l; i++) {
+          skip(STRING_SCHEMA, in);
+          skip(value, in);
+        }
+      }
+      break;
+    case UNION:
+      skip(schema.getTypes().get((int)in.readIndex()), in);
+      break;
+    case FIXED:
+      in.skipFixed(schema.getFixedSize());
+      break;
+    case STRING:
+    case BYTES:
+      in.skipBytes();
+      break;
+    case INT:     in.readInt();           break;
+    case LONG:    in.readLong();          break;
+    case FLOAT:   in.readFloat();         break;
+    case DOUBLE:  in.readDouble();        break;
+    case BOOLEAN: in.readBoolean();       break;
+    case NULL:                            break;
+    default: throw new RuntimeException("Unknown type: "+schema);
+    }
+  }
+
 }



Mime
View raw message