Return-Path: Delivered-To: apmail-hadoop-avro-commits-archive@minotaur.apache.org Received: (qmail 64532 invoked from network); 25 Jun 2009 18:53:40 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 25 Jun 2009 18:53:40 -0000 Received: (qmail 91131 invoked by uid 500); 25 Jun 2009 18:53:51 -0000 Delivered-To: apmail-hadoop-avro-commits-archive@hadoop.apache.org Received: (qmail 91106 invoked by uid 500); 25 Jun 2009 18:53:51 -0000 Mailing-List: contact avro-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: avro-dev@hadoop.apache.org Delivered-To: mailing list avro-commits@hadoop.apache.org Received: (qmail 91095 invoked by uid 99); 25 Jun 2009 18:53:51 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jun 2009 18:53:51 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jun 2009 18:53:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B32EC23888AD; Thu, 25 Jun 2009 18:53:17 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r788462 [1/3] - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/generic/ src/java/org/apache/avro/io/ src/java/org/apache/avro/io/doc-files/ src/test/java/org/apache/avro/io/ Date: Thu, 25 Jun 2009 18:53:17 -0000 To: avro-commits@hadoop.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090625185317.B32EC23888AD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Thu Jun 25 18:53:16 2009 New Revision: 788462 URL: http://svn.apache.org/viewvc?rev=788462&view=rev Log: AVRO-29. Add to Java a validating encoder and decoder, and a resolving decoder. Contributed by Thiruvalluvan M. G. & Raymie Stata. Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/doc-files/ (with props) hadoop/avro/trunk/src/java/org/apache/avro/io/doc-files/parsing.html hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestParsingTable.java hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestResolvingIO.java hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestResolvingTable.java hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValidatingIO.java Modified: hadoop/avro/trunk/CHANGES.txt hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java Modified: hadoop/avro/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=788462&r1=788461&r2=788462&view=diff ============================================================================== --- hadoop/avro/trunk/CHANGES.txt (original) +++ hadoop/avro/trunk/CHANGES.txt Thu Jun 25 18:53:16 2009 @@ -41,6 +41,9 @@ AVRO-48. Add JSON parser for C. (Matt Massie via cutting) + AVRO-29. Add to Java a validating encoder & decoder, and a + resolving decoder. (Thiruvalluvan M. G. & Raymie Stata) + IMPROVEMENTS AVRO-11. Re-implement specific and reflect datum readers and Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=788462&r1=788461&r2=788462&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java Thu Jun 25 18:53:16 2009 @@ -18,59 +18,57 @@ package org.apache.avro.generic; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.BitSet; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Set; -import java.nio.ByteBuffer; - -import org.codehaus.jackson.JsonNode; import org.apache.avro.AvroRuntimeException; -import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.ResolvingDecoder; import org.apache.avro.io.Decoder; import org.apache.avro.util.Utf8; +import org.codehaus.jackson.JsonNode; /** {@link DatumReader} for generic Java objects. */ public class GenericDatumReader implements DatumReader { - private Schema actual; - private Schema expected; - - public GenericDatumReader() {} + private Schema writerSchema; + private Schema readerSchema; + + public GenericDatumReader() { } - public GenericDatumReader(Schema actual) { - setSchema(actual); + public GenericDatumReader(Schema schema) { + this.readerSchema = schema; } - public GenericDatumReader(Schema actual, Schema expected) { - this(actual); - this.expected = expected; + public GenericDatumReader(Schema writerSchema, Schema readerSchema) { + this.writerSchema = writerSchema; + this.readerSchema = readerSchema; } - public void setSchema(Schema actual) { this.actual = actual; } + public void setSchema(Schema schema) { + this.readerSchema = schema; + } @SuppressWarnings("unchecked") public D read(D reuse, Decoder in) throws IOException { - return (D) read(reuse, actual, expected != null ? expected : actual, in); + return (D) read(reuse, readerSchema, + writerSchema == null ? in : + new ResolvingDecoder(writerSchema, readerSchema, in)); } /** Called to read data.*/ - protected Object read(Object old, Schema actual, - Schema expected, Decoder in) throws IOException { - if (actual.getType() == Type.UNION) // resolve unions - actual = actual.getTypes().get((int)in.readIndex()); - if (expected.getType() == Type.UNION) - expected = resolveExpected(actual, expected); - switch (actual.getType()) { - case RECORD: return readRecord(old, actual, expected, in); - case ENUM: return readEnum(actual, expected, in); - case ARRAY: return readArray(old, actual, expected, in); - case MAP: return readMap(old, actual, expected, in); - case FIXED: return readFixed(old, actual, expected, in); + protected Object read(Object old, Schema schema, Decoder in) + throws IOException { + switch (schema.getType()) { + case RECORD: return readRecord(old, schema, in); + case ENUM: return readEnum(schema, in); + case ARRAY: return readArray(old, schema, in); + case MAP: return readMap(old, schema, in); + case FIXED: return readFixed(old, schema, in); case STRING: return readString(old, in); case BYTES: return readBytes(old, in); case INT: return in.readInt(); @@ -79,90 +77,62 @@ case DOUBLE: return in.readDouble(); case BOOLEAN: return in.readBoolean(); case NULL: return null; - default: throw new AvroRuntimeException("Unknown type: "+actual); + case UNION: return readUnion(old, schema, in); + default: throw new AvroRuntimeException("Unknown type: " + schema + + " " + schema.getType()); } } - private Schema resolveExpected(Schema actual, Schema expected) { - // first scan for exact match - for (Schema branch : expected.getTypes()) - if (branch.getType() == actual.getType()) - switch (branch.getType()) { - case RECORD: - case FIXED: - String name = branch.getName(); - if (name == null || name.equals(actual.getName())) - return branch; - break; - default: - return branch; - } - // then scan match via numeric promotion - for (Schema branch : expected.getTypes()) - switch (actual.getType()) { - case INT: - switch (expected.getType()) { - case LONG: case FLOAT: case DOUBLE: - return expected; - } - break; - case LONG: - switch (expected.getType()) { - case FLOAT: case DOUBLE: - return expected; - } - break; - case FLOAT: - switch (expected.getType()) { - case DOUBLE: - return expected; - } - break; - } - throw new AvroTypeException("Expected "+expected+", found "+actual); - } - /** Called to read a record instance. May be overridden for alternate record * representations.*/ - protected Object readRecord(Object old, Schema actual, Schema expected, + protected Object readRecord(Object old, Schema schema, Decoder in) throws IOException { - /* TODO: We may want to compute the expected and actual mapping and cache - * the mapping (keyed by ). */ - String recordName = expected.getName(); - if (recordName != null && !recordName.equals(actual.getName())) - throw new AvroTypeException("Expected "+expected+", found "+actual); - Map expectedFields = expected.getFields(); - // all fields not in expected should be removed by newRecord. - Object record = newRecord(old, expected); + if (in instanceof ResolvingDecoder) { + return readRecord(old, schema, (ResolvingDecoder) in); + } + Object record = newRecord(old, schema); int size = 0; - for (Map.Entry entry : actual.getFields().entrySet()) { + for (Map.Entry entry : schema.getFields().entrySet()) { String fieldName = entry.getKey(); - Field actualField = entry.getValue(); - Field expectedField = - expected == actual ? actualField : expectedFields.get(entry.getKey()); - if (expectedField == null) { - skip(actualField.schema(), in); - continue; - } - int fieldPosition = expectedField.pos(); + Field field = entry.getValue(); + int fieldPosition = field.pos(); Object oldDatum = (old != null) ? getField(record, fieldName, fieldPosition) : null; addField(record, fieldName, fieldPosition, - read(oldDatum,actualField.schema(),expectedField.schema(), in)); + read(oldDatum, field.schema(), in)); size++; } - if (expectedFields.size() > size) { // not all fields set - Set actualFields = actual.getFields().keySet(); - for (Map.Entry entry : expectedFields.entrySet()) { + return record; + } + + protected Object readRecord(Object old, Schema schema, + ResolvingDecoder in) throws IOException { + Object record = newRecord(old, schema); + Map readerFields = schema.getFields(); + + BitSet bs = new BitSet(); + for (int i = 0; i < readerFields.size(); i++) { + String fn = in.readFieldName(); + if (fn == null) { + break; + } + Field f = readerFields.get(fn); + int fp = f.pos(); + bs.set(fp); + Object oldDatum = + (old != null) ? getField(record, fn, fp) : null; + addField(record, fn, fp, read(oldDatum, f.schema(), in)); + } + for (Map.Entry entry : readerFields.entrySet()) { + Field f = entry.getValue(); + if (! bs.get(f.pos())) { String fieldName = entry.getKey(); - if (!actualFields.contains(fieldName)) { // an unset field - Field f = entry.getValue(); - JsonNode json = f.defaultValue(); - if (json != null) // has default - addField(record, fieldName, f.pos(), // add default - defaultFieldValue(old, f.schema(), json)); - else if (old != null) // remove stale value - removeField(record, fieldName, entry.getValue().pos()); + JsonNode json = f.defaultValue(); + if (json != null) { // has default + addField(record, fieldName, f.pos(), // add default + defaultFieldValue(old, f.schema(), json)); + } else if (old != null) { // remove stale value + removeField(record, fieldName, entry.getValue().pos()); } } } @@ -237,18 +207,16 @@ case DOUBLE: return json.getDoubleValue(); case BOOLEAN: return json.getBooleanValue(); case NULL: return null; - default: throw new AvroRuntimeException("Unknown type: "+actual); + default: throw new AvroRuntimeException("Unknown type: " + schema); } } /** Called to read an enum value. May be overridden for alternate enum * representations. By default, returns the symbol as a String. */ - protected Object readEnum(Schema actual, Schema expected, Decoder in) + protected Object readEnum(Schema schema, Decoder in) throws IOException { - String name = expected.getName(); - if (name != null && !name.equals(actual.getName())) - throw new AvroTypeException("Expected "+expected+", found "+actual); - return createEnum(actual.getEnumSymbols().get(in.readEnum()), expected); + return createEnum(schema.getEnumSymbols().get(in.readEnum()), + schema); } /** Called to create an enum value. May be overridden for alternate enum @@ -257,19 +225,18 @@ /** Called to read an array instance. May be overridden for alternate array * representations.*/ - protected Object readArray(Object old, Schema actual, Schema expected, + protected Object readArray(Object old, Schema schema, Decoder in) throws IOException { - Schema actualType = actual.getElementType(); - Schema expectedType = expected.getElementType(); + Schema type = schema.getElementType(); long l = in.readArrayStart(); if (l > 0) { Object array = newArray(old, (int) l); do { for (long i = 0; i < l; i++) { - addToArray(array, read(peekArray(array), actualType, expectedType, in)); + addToArray(array, + read(peekArray(array), type, in)); } } while ((l = in.arrayNext()) > 0); - return array; } else { return newArray(old, 0); @@ -293,10 +260,9 @@ /** Called to read a map instance. May be overridden for alternate map * representations.*/ - protected Object readMap(Object old, Schema actual, Schema expected, + protected Object readMap(Object old, Schema schema, Decoder in) throws IOException { - Schema aValue = actual.getValueType(); - Schema eValue = expected.getValueType(); + Schema valueType = schema.getValueType(); long l = in.readMapStart(); Object map = newMap(old, (int) l); if (l > 0) { @@ -304,7 +270,7 @@ for (int i = 0; i < l; i++) { addToMap(map, readString(null, in), - read(null, aValue, eValue, in)); + read(null, valueType, in)); } } while ((l = in.mapNext()) > 0); } @@ -320,13 +286,11 @@ /** Called to read a fixed value. May be overridden for alternate fixed * representations. By default, returns {@link GenericFixed}. */ - protected Object readFixed(Object old, Schema actual, Schema expected, + protected Object readFixed(Object old, Schema schema, Decoder in) throws IOException { - if (!actual.equals(expected)) - throw new AvroTypeException("Expected "+expected+", found "+actual); - GenericFixed fixed = (GenericFixed)createFixed(old, expected); - in.readFixed(fixed.bytes(), 0, actual.getFixedSize()); + GenericFixed fixed = (GenericFixed) createFixed(old, schema); + in.readFixed(fixed.bytes(), 0, schema.getFixedSize()); return fixed; } @@ -346,6 +310,14 @@ System.arraycopy(bytes, 0, fixed.bytes(), 0, schema.getFixedSize()); return fixed; } + + private Object readUnion(Object old, Schema schema, Decoder in) + throws IOException { + int idx = in.readIndex(); + Schema s = schema.getTypes().get(idx); + return read(old, s, in); + } + /** * Called to create new record instances. Subclasses may override to use a * different record implementation. The returned instance must conform to the @@ -408,54 +380,4 @@ * override to use a different byte array representation. By default, this * calls {@link ByteBuffer#wrap(byte[])}.*/ protected Object createBytes(byte[] value) { return ByteBuffer.wrap(value); } - - private static final Schema STRING_SCHEMA = Schema.create(Type.STRING); - - /** Skip an instance of a schema. */ - public static void skip(Schema schema, Decoder in) throws IOException { - switch (schema.getType()) { - case RECORD: - for (Map.Entry entry : schema.getFieldSchemas()) - skip(entry.getValue(), in); - break; - case ENUM: - in.readInt(); - break; - case ARRAY: - Schema elementType = schema.getElementType(); - for (long l = in.skipArray(); l > 0; l = in.skipArray()) { - for (long i = 0; i < l; i++) { - skip(elementType, in); - } - } - break; - case MAP: - Schema value = schema.getValueType(); - for (long l = in.skipMap(); l > 0; l = in.skipMap()) { - for (long i = 0; i < l; i++) { - skip(STRING_SCHEMA, in); - skip(value, in); - } - } - break; - case UNION: - skip(schema.getTypes().get((int)in.readIndex()), in); - break; - case FIXED: - in.skipFixed(schema.getFixedSize()); - break; - case STRING: - case BYTES: - in.skipBytes(); - break; - case INT: in.readInt(); break; - case LONG: in.readLong(); break; - case FLOAT: in.readFloat(); break; - case DOUBLE: in.readDouble(); break; - case BOOLEAN: in.readBoolean(); break; - case NULL: break; - default: throw new RuntimeException("Unknown type: "+schema); - } - } - } Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java?rev=788462&r1=788461&r2=788462&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryDecoder.java Thu Jun 25 18:53:16 2009 @@ -189,11 +189,6 @@ } @Override - public void readFixed(byte[] bytes) throws IOException { - readFixed(bytes, 0, bytes.length); - } - - @Override public void skipFixed(int length) throws IOException { doSkipBytes(length); } Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java?rev=788462&r1=788461&r2=788462&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryEncoder.java Thu Jun 25 18:53:16 2009 @@ -21,7 +21,6 @@ import java.io.OutputStream; import java.nio.ByteBuffer; -import org.apache.avro.AvroTypeException; import org.apache.avro.ipc.ByteBufferOutputStream; import org.apache.avro.util.Utf8; @@ -134,11 +133,6 @@ } @Override - public void writeString(String str) throws IOException { - writeString(new Utf8(str)); - } - - @Override public void writeBytes(ByteBuffer bytes) throws IOException { byteWriter.write(bytes); } @@ -150,21 +144,11 @@ } @Override - public void writeBytes(byte[] bytes) throws IOException { - writeBytes(bytes, 0, bytes.length); - } - - @Override public void writeFixed(byte[] bytes, int start, int len) throws IOException { out.write(bytes, start, len); } @Override - public void writeFixed(byte[] bytes) throws IOException { - writeFixed(bytes, 0, bytes.length); - } - - @Override public void writeEnum(int e) throws IOException { encodeLong(e, out); } Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java?rev=788462&r1=788461&r2=788462&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java Thu Jun 25 18:53:16 2009 @@ -17,7 +17,6 @@ */ package org.apache.avro.io; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -143,7 +142,9 @@ * value to be read or the length is incorrect. * @throws IOException */ - public abstract void readFixed(byte[] bytes) throws IOException; + public void readFixed(byte[] bytes) throws IOException { + readFixed(bytes, 0, bytes.length); + } /** * Discards fixed sized binary object. @@ -276,4 +277,11 @@ */ public abstract int readIndex() throws IOException; + /** + * After reading a complete object that conforms to the schema or after an + * error, if you want to start reading another object, call this method. + */ + + public void reset() throws IOException { + } } Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java?rev=788462&r1=788461&r2=788462&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/Encoder.java Thu Jun 25 18:53:16 2009 @@ -107,7 +107,9 @@ * @throws AvroTypeException If this is a stateful writer and a * char-string is not expected */ - public abstract void writeString(String str) throws IOException; + public void writeString(String str) throws IOException { + writeString(new Utf8(str)); + } /** * Write a byte string. @@ -130,7 +132,9 @@ * @throws AvroTypeException If this is a stateful writer and a * byte-string is not expected */ - public abstract void writeBytes(byte[] bytes) throws IOException; + public void writeBytes(byte[] bytes) throws IOException { + writeBytes(bytes, 0, bytes.length); + } /** * Writes a fixed size binary object. @@ -148,7 +152,9 @@ * A shorthand for writeFixed(bytes, 0, bytes.length) * @param bytes */ - public abstract void writeFixed(byte[] bytes) throws IOException; + public void writeFixed(byte[] bytes) throws IOException { + writeFixed(bytes, 0, bytes.length); + } /** * Writes an enumeration. Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java?rev=788462&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java Thu Jun 25 18:53:16 2009 @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.AvroTypeException; + +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +/** + * The parsing table. + * + */ +class ParsingTable { + /** + * The terminal symbols for the grammar. Symbols are also index into + * production table {@link #prods}. Since terminals do not have productions, + * we can use negative values. + */ + protected static final int NULL = -1; + protected static final int BOOLEAN = NULL - 1; + protected static final int INT = BOOLEAN - 1; + protected static final int LONG = INT - 1; + protected static final int FLOAT = LONG - 1; + protected static final int DOUBLE = FLOAT - 1; + protected static final int STRING = DOUBLE - 1; + protected static final int BYTES = STRING - 1; + protected static final int FIXED = BYTES - 1; + protected static final int ENUM = FIXED - 1; + protected static final int UNION = ENUM - 1; + + protected static final int ARRAYSTART = UNION - 1; + protected static final int ARRAYEND = ARRAYSTART - 1; + protected static final int MAPSTART = ARRAYEND - 1; + protected static final int MAPEND = MAPSTART - 1; + + protected static final int LASTPRIM = MAPEND; + + private static final String[] terminalNames = { + "", "null", "boolean", "int", "long", "float", "double", "string", "bytes", + "union", "array-start", "array-end", "map-start", "map-end", + }; + + protected int root; + + /** + * If {@link #secondPass} == false the number entries in + * {@link #prods} for non-terminals. Otherwise one past the last entry + * for non-terminals in {@link #prods}. + */ + protected int nonTerminals; + + /** + * If {@link #secondPass} == false the number entries in + * {@link #prods} for repeaters. Otherwise one past the last entry + * for repeaters in {@link #prods}. + */ + protected int repeaters; + + /** + * If {@link #secondPass} == false the number entries in + * {@link #prods} for unions. Otherwise one past the last entry + * for unions in {@link #prods}. + */ + protected int unions; + + /** + * If {@link #secondPass} == false the number entries in + * {@link #prods} for fixeds. Otherwise one past the last entry + * for fixeds in {@link #prods}. + */ + protected int fixeds; + + /** + * If {@link #secondPass} == false the number entries in + * {@link #prods} for enums. Otherwise one past the last entry + * for enums in {@link #prods}. + */ + protected int enums; + + /** + * The productions for non-terminals, repeaters and unions. + */ + protected int[] prods; + + /** + * In the two-pass construction of the table, false during + * the first pass and true during the second pass. + */ + protected boolean secondPass = false; + + protected ParsingTable() { } + + public ParsingTable(Schema sc) { + generate(sc, new HashMap()); + + int total = nonTerminals + repeaters + unions + fixeds + enums; + prods = new int[total]; + enums = total - enums; + fixeds = enums - fixeds; + unions = fixeds - unions; + repeaters = unions - repeaters; + nonTerminals = repeaters - nonTerminals; + + secondPass = true; + root = generate(sc, new HashMap()); + } + + /** + * Returns the id for the non-terminal that is the start symbol + * for the given schema sc. If there is already an entry + * for the given schema in the given map seen then + * the id corresponding to that entry is retutuned. Otherwise + * a new id is generated and an entry is inserted into the map. + * @param sc + * @param seen A map of schema to id mapping done so far. + * @return + */ + protected final int generate(Schema sc, Map seen) { + switch (sc.getType()) { + case NULL: + return NULL; + case BOOLEAN: + return BOOLEAN; + case INT: + return INT; + case LONG: + return LONG; + case FLOAT: + return FLOAT; + case DOUBLE: + return DOUBLE; + case STRING: + return STRING; + case BYTES: + return BYTES; + case FIXED: + return mkNonTerm(FIXED, mkFixed(sc.getFixedSize())); + case ENUM: + return mkNonTerm(ENUM, mkEnum(sc.getEnumSymbols().size())); + case ARRAY: + int ar_et = generate(sc.getElementType(), seen); + int r_et = mkRepeater(ar_et); + return mkNonTerm(ARRAYSTART, r_et, ARRAYEND); + case MAP: + int ar_vt = generate(sc.getValueType(), seen); + int r_vt = mkRepeater(STRING, ar_vt); + return mkNonTerm(MAPSTART, r_vt, MAPEND); + case RECORD: + LitS wsc = new LitS(sc); + Integer rresult = seen.get(wsc); + if (rresult == null) { + int size = sc.getFields().size(); + rresult = allocNonTerm(size); + int i = rresult + size; + for (Field f : sc.getFields().values()) { + set(--i, generate(f.schema(), seen)); + } + seen.put(wsc, rresult); + } + return rresult; + + case UNION: + List subs = sc.getTypes(); + int u = allocUnion(subs.size()); + for (Schema b : sc.getTypes()) { + set(u++, generate(b, seen)); + } + return mkNonTerm(UNION, u - subs.size()); + + default: + throw new RuntimeException("Unexpected schema type"); + } + } + + protected final int set(int index, int value) { + if (secondPass) { + prods[index] = value; + } + return value; + } + + /** Allocates a new non-terminal which in turn uses len + * new non-terminals. + * Each non-terminal has a unique integer id. + * @param len Number of non-terminals used by the freshly allocated + * non-terminal. + * @return The id for the new non-terminal allocated. + */ + protected final int allocNonTerm(int len) { + set(nonTerminals, len); + nonTerminals += (len + 1); + return nonTerminals - len; + } + + protected final int mkNonTerm(int e1) { + int nt = nonTerminals; + set(nt, e1); + nonTerminals += 2; + return nt; + } + + protected final int mkNonTerm(int e1, int e2) { + set(nonTerminals++, 2); + set(nonTerminals++, e2); + set(nonTerminals++, e1); + return nonTerminals - 2; + } + + protected final int mkNonTerm(int e1, int e2, int e3) { + set(nonTerminals++, 3); + set(nonTerminals++, e3); + set(nonTerminals++, e2); + set(nonTerminals++, e1); + return nonTerminals - 3; + } + + protected final int mkRepeater(int e1) { + int i = repeaters; + set(i++, 2); + set(i, i) /*recursion*/; + set(i + 1, e1); + repeaters += 3; + return i; + } + + protected final int mkRepeater(int e1, int e2) { + int i = repeaters; + set(i++, 3); + set(i, i) /* recursion */; + set(i + 1, e2); + set(i + 2, e1); + repeaters += 4; + return i; + } + + protected final int allocUnion(int len) { + set(unions, len); + unions += (len + 1); + return unions - len; + } + + protected final int mkFixed(int size) { + set(fixeds++, 1); + set(fixeds++, size); + return fixeds - 1; + } + + protected final int mkEnum(int max) { + set(enums++, 1); + set(enums++, max); + return enums - 1; + } + + public final int size(int sym) { + return prods[sym - 1]; + } + + /** + * Returns true iff the given symbol sym is terminal. + * @param sym The symbol that needs + * @return + */ + public final boolean isTerminal(int sym) { + return sym < 0; + } + + public final boolean isNonTerminal(int sym) { + return 0 <= sym && sym < nonTerminals; + } + + public final boolean isRepeater(int sym) { + return nonTerminals <= sym && sym < repeaters; + } + + public final boolean isUnion(int sym) { + return repeaters <= sym && sym < unions; + } + + public final boolean isFixed(int sym) { + return unions <= sym && sym < fixeds; + } + + public final boolean isEnum(int sym) { + return fixeds <= sym && sym < enums; + } + + public final int getBranch(int union, int unionIndex) { + // assert isUnion(union); + if (unionIndex < 0 || size(union) <= unionIndex) { + throw new AvroTypeException("Union index out of bounds (" + + unionIndex + ")"); + } + return prods[union + unionIndex]; + } + + public final int getFixedSize(int sym) { + return prods[sym]; + } + + public final int getEnumMax(int sym) { + return prods[sym]; + } + + /** A wrapper around Schema that does "==" equality. */ + protected static class LitS { + public final Schema actual; + public LitS(Schema actual) { this.actual = actual; } + + /** + * Two LitS are equal if and only if their underlying schema is + * the same (not merely equal). + */ + public boolean equals(Object o) { + if (! (o instanceof LitS)) return false; + return actual == ((LitS)o).actual; + } + + public int hashCode() { + return actual.hashCode(); + } + } + + /** + * Returns the name for the terminal. Useful for generating diagnostic + * messages. + * @param n The terminal symbol for which the name is required. + * @return + */ + public static String getTerminalName(int n) { + assert n < 0 && n >= LASTPRIM; + return terminalNames[-n]; + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + appendTo(sb, "root", root); + appendTo(sb, "enums", enums); + appendTo(sb, "fixeds", fixeds); + appendTo(sb, "unions", unions); + appendTo(sb, "repeaters", repeaters); + + appendTo(sb, prods); + return sb.toString(); + } + + protected static void appendTo(StringBuffer sb, int[] prods) { + sb.append('['); + for (int i = 0; i < prods.length; i++) { + if (i != 0) { + sb.append(", "); + } + sb.append(prods[i]); + } + sb.append(']'); + } + + protected static void appendTo(StringBuffer sb, String name, int value) { + sb.append(name); + sb.append(" = "); + sb.append(value); + sb.append(", "); + } + +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java?rev=788462&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java Thu Jun 25 18:53:16 2009 @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import java.io.IOException; + +import org.apache.avro.AvroTypeException; +import org.apache.avro.Schema; + +/** + * {@link Decoder} that peforms type-resolution between the reader's and + * writer's schemas. + * + *

When resolving schemas, this class will return the values of fields in + * _writer's_ order, not the reader's order. (However, it returns _only_ the + * reader's fields, not any extra fields the writer may have written.) To help + * clients handle fields that appear to be coming out of order, this class + * defines the methods {@link #readFieldName} and {@link #readFieldIndex}. + * When called just before reading the value of a record-field, they return the + * name/index of the field about to be read. See {@link #readFieldIndex} for + * usage details. + * + *

See the parser documentation for + * information on how this works.*/ +public class ResolvingDecoder extends ValidatingDecoder { + private ResolvingTable rtable; + + public ResolvingDecoder(Schema writer, Schema reader, Decoder in) + throws IOException { + super(new ResolvingTable(writer, reader), in); + rtable = (ResolvingTable) table; + } + + /** Returns the name of the next field of the record we're reading. + * Similar to {@link #readFieldIndex} -- see that method for + * details. + * + * @throws IllegalStateExcpetion If we're not about to read a record-field + */ + public String readFieldName() throws IOException { + int actual = advance(ResolvingTable.FIELDACTION); + return rtable.getFieldName(actual); + } + + /** Returns the (zero-based) index of the next field of the record + * we're reading. + * + * This method is useful because {@link ResolvingDecoder} + * returns values in the order written by the writer, rather than + * the order expected by the reader. This method allows reader's + * to figure out what fields to expect. Let's say the reader is + * expecting a three-field record, the first field is a long, the + * second a string, and the third an array. In this case, a + * typical usage might be as follows: + *

+    *   for (int i = 0; i < 3; i++) {
+    *     switch (in.readFieldIndex()) {
+    *     case 1:
+    *       foo(in.readLong());
+    *       break;
+    *     case 2:
+    *       someVariable = in.readString();
+    *       break;
+    *     case 3:
+    *       bar(in); // The code of "bar" will read an array-of-int
+    *       break;
+    *     }
+    * 
+ * Note that {@link ResolvingDecoder} will return only the + * fields expected by the reader, not other fields that may have + * been written by the writer. Thus, the iteration-count of "3" in + * the above loop will always be correct. + * + * Throws a runtime exception if we're not just about to read the + * field of a record. Also, this method (and {@link + * #readFieldName}) will consume the field information, and + * thus may only be called once before reading the field + * value. (However, if the client knows the order of incoming + * fields and does not need to reorder them, then the client does + * not need to call this or {@link #readFieldName}.) + * + * @throws IllegalStateExcpetion If we're not about to read a record-field + * + */ + public int readFieldIndex() throws IOException { + int actual = advance(ResolvingTable.FIELDACTION); + return rtable.getFieldIndex(actual); + } + + @Override + public long readLong() throws IOException { + int actual = advance(ResolvingTable.LONG); + if (actual == ResolvingTable.INT) { + return in.readInt(); + } else if (actual == ResolvingTable.DOUBLE) { + return (long) in.readDouble(); + } else { + assert actual == ResolvingTable.LONG; + return in.readLong(); + } + } + + @Override + public double readDouble() throws IOException { + int actual = advance(ResolvingTable.DOUBLE); + if (actual == ResolvingTable.INT) { + return (double) in.readInt(); + } else if (actual == ResolvingTable.LONG) { + return (double) in.readLong(); + } else if (actual == ResolvingTable.FLOAT) { + return (double) in.readFloat(); + } else { + assert actual == ResolvingTable.DOUBLE; + return in.readDouble(); + } + } + + @Override + public int readEnum() throws IOException { + advance(ResolvingTable.ENUM); + int top = stack[--pos]; + int n = in.readEnum(); + if (n >= 0 && n < rtable.size(top)) { + n = rtable.getEnumAction(top, n); + if (rtable.isEnumAction(n)) { + return rtable.getEnumValue(n); + } else { + assert rtable.isErrorAction(n); + throw new AvroTypeException(rtable.getMessage(n)); + } + } else { + throw new AvroTypeException("Enumeration out of range: " + n + + " max: " + rtable.size(top)); + } + } + + @Override + public int readIndex() throws IOException { + advance(ParsingTable.UNION); + int actual = stack[--pos]; + if (rtable.isUnion(actual)) { + actual = rtable.getBranch(actual, in.readInt()); + } + if (rtable.isReaderUnionAction(actual)) { + // Both reader and writer where a union. Based on + // the writer's actual branch, go get the appropriate + // readerUnionAction + stack[pos++] = rtable.getReaderUnionSym(actual); + return rtable.getReaderUnionIndex(actual); + } else { + throw new AvroTypeException("Unexpected index read"); + } + } + + @Override + protected int skipSymbol(int sym, int p) throws IOException { + if (rtable.isResolverAction(sym)) { + return skipSymbol(rtable.getResolverActual(sym), -1); + } else { + return super.skipSymbol(sym, p); + } + } + + @Override + protected int advance(int input) throws IOException { + int top = stack[--pos]; + while (! rtable.isTerminal(top)) { + if (rtable.isAction(top)) { + if (rtable.isFieldAction(top)) { + if (input == ResolvingTable.FIELDACTION) return top; + } else if (rtable.isResolverAction(top)) { + return rtable.getResolverActual(top); + } else if (rtable.isSkipAction(top)) { + skipSymbol(rtable.getProductionToSkip(top), -1); + } else if (rtable.isWriterUnionAction(top)) { + stack[pos++] = rtable.getBranch(top, in.readIndex()); + } else if (rtable.isErrorAction(top)) { + throw new AvroTypeException(rtable.getMessage(top)); + } + } else if (! rtable.isRepeater(top) + || (input != ParsingTable.ARRAYEND + && input != ParsingTable.MAPEND)) { + int plen = rtable.size(top); + if (stack.length < pos + plen) { + stack = expand(stack, pos + plen); + } + System.arraycopy(rtable.prods, top, stack, pos, plen); + pos += plen; + } + top = stack[--pos]; + } + if (top == input) { + return top; + } + throw new AvroTypeException("Attempt to read " + input + + " when a " + top + " was expected."); + } + public void reset() throws IOException { + while (pos > 0) { + if (rtable.isSkipAction(stack[pos - 1])) { + skipProduction(rtable.getProductionToSkip(stack[--pos])); + } else { + throw new AvroTypeException("Data not fully drained."); + } + } + stack[pos++] = table.root; + } +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java?rev=788462&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java Thu Jun 25 18:53:16 2009 @@ -0,0 +1,489 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; + +import java.util.*; + +/** + * The parsing table for "resolving" between two schemas. This class + * is useful for validating inputs and not outputs. There is a reader's + * schema and a writer's schema. The reader is interested in data according to + * the reader's schema. But the data itself has been written according to + * the writer's schema. While reading the reader gets an impression that + * the data is in the reader's schema (except one change mentioned below). + * The schema resolution process does the required transformation(s). Of course, + * for this to work, the two schemas must be "compatible". Though some + * of the incompatibilities can be statically detected by looking at the two + * schemas, we report the incompatibilities only when data is sought to be read. + * + * While reading records, the fileds are returned according to the + * writer's schema and not the reader's. This apparently un-intuitive method + * is used in order to avoid caching at the ResolvingDecoder level. + */ +class ResolvingTable extends ParsingTable { + public static final int FIELDACTION = LASTPRIM - 1; + + protected int errorActions; + protected int readerUnionActions; + protected int resolverActions; + protected int fieldActions; + protected int skipActions; + protected int writerUnionActions; + protected int enumActions; + + private String[] strings; + // Index into strings array. Used during construction. + private int spos; + + public ResolvingTable(Schema writer, Schema reader) { + generate(writer, reader, new HashMap()); + + int total = (errorActions + readerUnionActions + resolverActions + + nonTerminals + repeaters + unions + fixeds + enums + + fieldActions + skipActions + writerUnionActions + + enumActions); + + prods = new int[total]; + strings = new String[errorActions + fieldActions]; + + enumActions = total - enumActions; + writerUnionActions = enumActions - writerUnionActions; + skipActions = writerUnionActions - skipActions; + fieldActions = skipActions - fieldActions; + resolverActions = fieldActions - resolverActions; + readerUnionActions = resolverActions - readerUnionActions; + errorActions = readerUnionActions - errorActions; + enums = errorActions - enums; + fixeds = enums - fixeds; + unions = fixeds - unions; + repeaters = unions - repeaters; + nonTerminals = repeaters - nonTerminals; + spos = 0; + + secondPass = true; + root = generate(writer, reader, new HashMap()); + } + + protected final int generate(Schema writer, Schema reader, + Map seen) { + Schema.Type writerType = writer.getType(); + Schema.Type readerType = reader.getType(); + + if (writerType == readerType) { + switch (writerType) { + case NULL: + return NULL; + case BOOLEAN: + return BOOLEAN; + case INT: + return INT; + case LONG: + return LONG; + case FLOAT: + return FLOAT; + case DOUBLE: + return DOUBLE; + case STRING: + return STRING; + case BYTES: + return BYTES; + } + } + + if (writerType == Schema.Type.UNION && readerType != Schema.Type.UNION) { + List branches = writer.getTypes(); + int u = allocWriterUnionAction(branches.size()); + for (Schema w : branches) { + set(u++, generate(w, reader, seen)); + } + return u - branches.size(); + } + + switch (readerType) { + case NULL: + case BOOLEAN: + case INT: + case STRING: + case FLOAT: + case BYTES: + return mkErrorAction("Found " + writer + ", expecting " + reader); + + case LONG: + switch (writerType) { + case INT: + case DOUBLE: + case FLOAT: + return mkResolverAction(generate(writer, seen)); + default: + return mkErrorAction("Found " + writer + ", expecting " + reader); + } + + case DOUBLE: + switch (writerType) { + case INT: + case LONG: + case FLOAT: + return mkResolverAction(generate(writer, seen)); + default: + return mkErrorAction("Found " + writer + ", expecting " + reader); + } + + case FIXED: + if (writerType != Schema.Type.FIXED + || (writer.getName() != null + && ! writer.getName().equals(reader.getName()))) { + return mkErrorAction("Found " + writer + ", expecting " + reader); + } else { + return (writer.getFixedSize() == reader.getFixedSize()) ? + mkNonTerm(FIXED, mkFixed(reader.getFixedSize())) : + mkErrorAction("Size mismatch in fixed field: found " + + writer.getFixedSize() + ", expecting " + reader.getFixedSize()); + } + case ENUM: + if (writerType != Schema.Type.ENUM + || (writer.getName() != null + && ! writer.getName().equals(reader.getName()))) { + return mkErrorAction("Found " + writer + ", expecting " + reader); + } else { + int result = allocEnumAction(writer.getEnumSymbols().size()); + int u = result; + for (String n : writer.getEnumSymbols()) { + set(u++, mkEnumAction(n, reader)); + } + return mkNonTerm(ENUM, result); + } + + case ARRAY: + if (writerType != Schema.Type.ARRAY) { + return mkErrorAction("Found " + writer + ", expecting " + reader); + } else { + int ar_et = generate(writer.getElementType(), + reader.getElementType(), seen); + int r_et = mkRepeater(ar_et); + return mkNonTerm(ARRAYSTART, r_et, ARRAYEND); + } + + case MAP: + if (writerType != Schema.Type.MAP) { + return mkErrorAction("Found " + writer + ", expecting " + reader); + } else { + int ar_vt = generate(writer.getValueType(), + reader.getValueType(), + seen); + int r_vt = mkRepeater(STRING, ar_vt); + return mkNonTerm(MAPSTART, r_vt, MAPEND); + } + + case RECORD: + LitS wsc = new LitS2(writer, reader); + if (seen.get(wsc) == null) { + int result; + if (writerType != Schema.Type.RECORD + || (writer.getName() != null + && ! writer.getName().equals(reader.getName()))) { + result = mkErrorAction("Found " + writer + ", expecting " + reader); + } else { + outer: + do { + Map wfields = writer.getFields(); + Map rfields = reader.getFields(); + /* + * For now every field in read-record with no default value + * must be in write-record. + * Write record may have additional fields, which will be + * skipped during read. + */ + + boolean useDefault = false; + int rsize = 0; + for (Map.Entry e : rfields.entrySet()) { + Field f = wfields.get(e.getKey()); + if (f == null) { + Field wf = e.getValue(); + if (wf.defaultValue() == null) { + result = mkErrorAction("Found " + writer + ", expecting " + reader); + break outer; + } else { + useDefault = true; + } + } else { + rsize++; + } + } + int size = 2 * rsize + (wfields.size() - rsize); + if (useDefault) { + rsize++; + } + result = allocNonTerm(size); + int i = result + size; + for (Map.Entry wf : wfields.entrySet()) { + String fname = wf.getKey(); + Field rf = rfields.get(fname); + if (rf == null) { + set(--i, mkSkipAction(generate(wf.getValue().schema(), seen))); + } else { + set(--i, mkFieldAction(rf.pos(), fname)); + set(--i, generate(wf.getValue().schema(), rf.schema(), seen)); + } + } + /* + * Insert a "special" field action to indicate that there are + * no more fields for this record, but some reader fields + * need to be filled with default values. + */ + if (useDefault) { + set(--i, mkFieldAction(-1, null)); + } + } while (false); + } + seen.put(wsc, result); + } + return seen.get(wsc); + + case UNION: + if (writerType != Schema.Type.UNION) { // Only reader is union + return mkNonTerm(UNION, mkReaderUnionAction(writer, reader, seen)); + } else { // Both reader and writer are unions + int result = allocUnion(writer.getTypes().size()); + int u = result; + for (Schema w : writer.getTypes()) { + set(u++, mkReaderUnionAction(w, reader, seen)); + } + return mkNonTerm(UNION, result); + } + + default: + throw new RuntimeException("Unexpected schema type: " + readerType); + } + } + + protected int mkString(String s) { + if (secondPass) { + strings[spos] = s; + } + return spos++; + } + + private final int mkErrorAction(String message) { + set(errorActions, mkString(message)); + return errorActions++; + } + + private final int mkReaderUnionAction(Schema w, Schema r, Map s) { + int j = bestBranch(r, w); + if (j < 0) { + return mkErrorAction("Found " + w + ", expecting " + r); + } else { + int result = readerUnionActions; + readerUnionActions += 2; + set(result, j); + set(result + 1, generate(w, r.getTypes().get(j), s)); + return result; + } + } + + private int bestBranch(Schema r, Schema w) { + Schema.Type vt = w.getType(); + // first scan for exact match + int j = 0; + for (Schema b : r.getTypes()) { + if (vt == b.getType()) + if (vt == Type.RECORD) { + String vname = w.getName(); + if (vname == null || vname.equals(b.getName())) + return j; + } else + return j; + j++; + } + + // then scan match via numeric promotion + j = 0; + for (Schema b : r.getTypes()) { + switch (vt) { + case INT: + switch (b.getType()) { + case LONG: case DOUBLE: + return j; + } + break; + case LONG: + case FLOAT: + switch (b.getType()) { + case DOUBLE: + return j; + } + break; + } + j++; + } + return -1; + } + +private final int mkResolverAction(int actual) { + set(resolverActions, actual); + return resolverActions++; + } + + private final int mkFieldAction(int index, String name) { + int result = fieldActions; + fieldActions += 2; + set(result, index); + set(result + 1, mkString(name)); + return result; + } + + private final int mkSkipAction(int toSkip) { + set(skipActions, toSkip); + return skipActions++; + } + + private final int allocWriterUnionAction(int len) { + set(writerUnionActions, len); + writerUnionActions += (len + 1); + return writerUnionActions - len; + } + + private final int allocEnumAction(int len) { + set(enumActions, len); + enumActions += (len + 1); + return enumActions - len; + } + + private final int mkEnumAction(String n, Schema r) { + int k = r.getEnumOrdinal(n); + if (k < 0) { + return mkErrorAction("Unknown enum: " + n); + } else { + set(enumActions++, 1); + set(enumActions++, k); + return enumActions - 1; + } + } + + public final boolean isAction(int sym) { + return enums <= sym && sym < enumActions; + } + + public final boolean isErrorAction(int sym) { + return enums <= sym && sym < errorActions; + } + + public final boolean isReaderUnionAction(int sym) { + return errorActions <= sym && sym < readerUnionActions; + } + + public final boolean isResolverAction(int sym) { + return readerUnionActions <= sym && sym < resolverActions; + } + + public final boolean isFieldAction(int sym) { + return resolverActions <= sym && sym < fieldActions; + } + + public final boolean isSkipAction(int sym) { + return fieldActions <= sym && sym < skipActions; + } + + public final boolean isWriterUnionAction(int sym) { + return skipActions <= sym && sym < writerUnionActions; + } + + public final boolean isEnumAction(int sym) { + return writerUnionActions <= sym && sym < enumActions; + } + + public final String getMessage(int errorAction) { + return strings[prods[errorAction]]; + } + + public final int getResolverActual(int resolverAction) { + return prods[resolverAction]; + } + + public final String getFieldName(int fieldAction) { + return strings[prods[fieldAction + 1]]; + } + + public final int getFieldIndex(int fieldAction) { + return prods[fieldAction]; + } + + public final int getProductionToSkip(int skipAction) { + return prods[skipAction]; + } + + public final int getReaderUnionIndex(int readerUnionAction) { + return prods[readerUnionAction]; + } + + public final int getReaderUnionSym(int readerUnionAction) { + return prods[readerUnionAction + 1]; + } + + public final int getEnumAction(int ntsym, int e) { + return prods[ntsym + e]; + } + + public final int getEnumValue(int enumAction) { + return prods[enumAction]; + } + + /** Clever trick which differentiates items put into + * seen by {@link #count(Schema,Map)} + * from those put in by {@link + * #count(Schema,Schema,Map)}. */ + protected static class LitS2 extends LitS { + public Schema expected; + public LitS2(Schema actual, Schema expected) { + super(actual); + this.expected = expected; + } + public boolean equals(Object o) { + if (! (o instanceof LitS2)) return false; + LitS2 other = (LitS2)o; + return actual == other.actual && expected == other.expected; + } + public int hashCode() { + return super.hashCode() + expected.hashCode(); + } + } + + public String toString() { + StringBuffer sb = new StringBuffer(); + appendTo(sb, "root", root); + appendTo(sb, "errorActions", errorActions); + appendTo(sb, "readerUnionActions", readerUnionActions); + appendTo(sb, "resolverActions", resolverActions); + appendTo(sb, "nonTerminals", nonTerminals); + appendTo(sb, "unions", unions); + appendTo(sb, "repeaters", repeaters); + appendTo(sb, "fieldActions", fieldActions); + appendTo(sb, "skipActions", skipActions); + appendTo(sb, "writerUnionActions", writerUnionActions); + appendTo(sb, "enumActions", enumActions); + + appendTo(sb, prods); + return sb.toString(); + } + +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java?rev=788462&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java Thu Jun 25 18:53:16 2009 @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; + +import org.apache.avro.AvroTypeException; +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; + +/** {@link Decoder} that ensures that the sequence of operations conforms + * to a schema. + *

See the parser documentation for + * information on how this works. + */ +public class ValidatingDecoder extends Decoder { + protected final Decoder in; + protected int[] stack; + protected int pos; + ParsingTable table; + + ValidatingDecoder(ParsingTable table, Decoder in) + throws IOException { + this.in = in; + this.stack = new int[5]; // Start small to make sure expansion code works + this.pos = 0; + this.table = table; + reset(); + } + + public ValidatingDecoder(Schema schema, Decoder in) + throws IOException { + this(new ParsingTable(schema), in); + } + + @Override + public void init(InputStream in) { + this.in.init(in); + } + + @Override + public void readNull() throws IOException { + advance(ParsingTable.NULL); + in.readNull(); + } + + @Override + public boolean readBoolean() throws IOException { + advance(ParsingTable.BOOLEAN); + return in.readBoolean(); + } + + @Override + public int readInt() throws IOException { + advance(ParsingTable.INT); + return in.readInt(); + } + + @Override + public long readLong() throws IOException { + advance(ParsingTable.LONG); + return in.readLong(); + } + + @Override + public float readFloat() throws IOException { + advance(ParsingTable.FLOAT); + return in.readFloat(); + } + + @Override + public double readDouble() throws IOException { + advance(ParsingTable.DOUBLE); + return in.readDouble(); + } + + @Override + public Utf8 readString(Utf8 old) throws IOException { + advance(ParsingTable.STRING); + return in.readString(old); + } + + @Override + public void skipString() throws IOException { + advance(ParsingTable.STRING); + in.skipString(); + } + + @Override + public ByteBuffer readBytes(ByteBuffer old) throws IOException { + advance(ParsingTable.BYTES); + return in.readBytes(old); + } + + @Override + public void skipBytes() throws IOException { + advance(ParsingTable.BYTES); + in.skipBytes(); + } + + @Override + public void readFixed(byte[] bytes, int start, int length) throws IOException { + advance(ParsingTable.FIXED); + int top = stack[--pos]; + assert table.isFixed(top); + if (table.getFixedSize(top) != length) { + throw new AvroTypeException( + "Incorrect length for fiexed binary: expected " + + table.getFixedSize(top) + " but received " + length + " bytes."); + } + in.readFixed(bytes, start, length); + } + + @Override + public void skipFixed(int length) throws IOException { + advance(ParsingTable.FIXED); + int top = stack[--pos]; + assert table.isFixed(top); + if (table.getFixedSize(top) != length) { + throw new AvroTypeException( + "Incorrect length for fiexed binary: expected " + + table.getFixedSize(top) + " but received " + length + " bytes."); + } + in.skipFixed(length); + } + + @Override + public int readEnum() throws IOException { + advance(ParsingTable.ENUM); + int top = stack[--pos]; + assert table.isEnum(top); + int result = in.readEnum(); + if (result < 0 || result >= table.getEnumMax(top)) { + throw new AvroTypeException( + "Enumeration out of range: max is " + + table.getEnumMax(top) + " but received " + result); + } + return result; + } + + @Override + public long readArrayStart() throws IOException { + advance(ParsingTable.ARRAYSTART); + long result = in.readArrayStart(); + if (result == 0) { + advance(ParsingTable.ARRAYEND); + } + return result; + } + + @Override + public long arrayNext() throws IOException { + long result = in.readLong(); + if (result == 0) { + advance(ParsingTable.ARRAYEND); + } + return result; + } + + @Override + public long skipArray() throws IOException { + advance(ParsingTable.ARRAYSTART); + skipFullArray(stack[--pos]); + advance(ParsingTable.ARRAYEND); + return 0; + } + + @Override + public long readMapStart() throws IOException { + advance(ParsingTable.MAPSTART); + long result = in.readMapStart(); + if (result == 0) { + advance(ParsingTable.MAPEND); + } + return result; + } + + @Override + public long mapNext() throws IOException { + long result = in.mapNext(); + if (result == 0) { + advance(ParsingTable.MAPEND); + } + return result; + } + + @Override + public long skipMap() throws IOException { + advance(ParsingTable.MAPSTART); + skipFullMap(stack[--pos]); + advance(ParsingTable.MAPEND); + return 0; + } + + @Override + public int readIndex() throws IOException { + advance(ParsingTable.UNION); + int top = stack[--pos]; + assert table.isUnion(top); + int result = in.readIndex(); + stack[pos++] = table.getBranch(top, result); + return result; + } + + @Override + public void reset() throws IOException { + /* + * See the design note for ValidatingValueWriter.reset() on why such + * a method is needed. + */ + while (pos > 0) { + skipSymbol(stack[pos], -1); + } + stack[pos++] = table.root; + } + + /** Skip the values described by a production. */ + protected void skipProduction(int ntsym) throws IOException { + if (! table.isNonTerminal(ntsym) && ! table.isRepeater(ntsym)) { + throw new IllegalArgumentException("Can't skip a " + ntsym); + } + for (int i = table.size(ntsym) - 1; i >= 0; i--) { + int sym = table.prods[ntsym + i]; + if (table.isRepeater(sym)) + continue; // Don't recurse -- our caller will do that for us + + i = skipSymbol(table.prods[ntsym + i], ntsym + i) - ntsym; + } + } + + protected int skipSymbol(int sym, int p) throws IOException { + switch (sym) { + case ParsingTable.NULL: + in.readNull(); + break; + case ParsingTable.BOOLEAN: + in.readBoolean(); + break; + case ParsingTable.INT: + in.readInt(); + break; + case ParsingTable.LONG: + in.readLong(); + break; + case ParsingTable.FLOAT: + in.readFloat(); + break; + case ParsingTable.DOUBLE: + in.readDouble(); + break; + case ParsingTable.STRING: + in.skipString(); + break; + case ParsingTable.BYTES: + in.skipBytes(); + break; + case ParsingTable.FIXED: + in.skipFixed(table.getFixedSize(table.prods[--p])); + break; + case ParsingTable.UNION: + skipSymbol(table.getBranch(table.prods[--p], in.readIndex()), -1); + break; + case ParsingTable.ARRAYSTART: + while (p-- >= 0) { + int element = table.prods[p]; + if (table.isRepeater(element)) { + skipFullArray(element); + break; + } + } + while (table.prods[p] != ParsingTable.ARRAYEND) { + p--; // (skip action syms) + } + break; + case ParsingTable.MAPSTART: + while (p-- >= 0) { + int element = table.prods[p]; + if (table.isRepeater(element)) { + skipFullMap(element); + break; + } + } + while (table.prods[p] != ParsingTable.MAPEND) { + p--; // (skip action syms) + } + break; + default: // record + skipProduction(sym); + } + return p; + } + + private final void skipFullMap(int element) throws IOException { + for (long c = in.skipMap(); c != 0; c = in.skipMap()) { + skipElements(element, c); + } + } + + private final void skipElements(int element, long count) throws IOException { + while (count-- > 0) { + skipProduction(element); + } + } + + private final void skipFullArray(int element) throws IOException { + for (long c = in.skipArray(); c != 0; c = in.skipArray()) { + skipElements(element, c); + } + } + + protected int advance(int input) throws IOException { + int top = stack[--pos]; + while (! table.isTerminal(top)) { + if (! table.isRepeater(top) + || (input != ParsingTable.ARRAYEND && input != ParsingTable.MAPEND)) { + int plen = table.size(top); + if (stack.length < pos + plen) { + stack = expand(stack, pos + plen); + } + System.arraycopy(table.prods, top, stack, pos, plen); + pos += plen; + } + top = stack[--pos]; + } + assert table.isTerminal(top); + if (top != input) { + throw new AvroTypeException("Attempt to read a " + + ParsingTable.getTerminalName(input) + " when a " + + ParsingTable.getTerminalName(top) + " was expected."); + } + return top; + } + + protected static int[] expand(int[] stack, int len) { + while (stack.length < len) { + stack = Arrays.copyOf(stack, stack.length + Math.max(stack.length, 1024)); + } + return stack; + } +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java?rev=788462&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java Thu Jun 25 18:53:16 2009 @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +import org.apache.avro.AvroTypeException; +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; + +/** {@link Encoder} that ensures that the sequence of operations conforms + * to a schema. + * + *

See the parser documentation for + * information on how this works.*/ +public class ValidatingEncoder extends Encoder { + private final Encoder out; + protected int[] stack; + protected int pos; + protected ParsingTable table; + + public ValidatingEncoder(Schema schema, Encoder out) { + this.out = out; + this.stack = new int[5]; // Start small to make sure expansion code works + this.pos = 0; + this.table = new ParsingTable(schema); + stack[pos++] = table.root; + } + + @Override + public void init(OutputStream out) throws IOException { + flush(); + this.out.init(out); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void writeNull() throws IOException { + advance(ParsingTable.NULL); + out.writeNull(); + } + + @Override + public void writeBoolean(boolean b) throws IOException { + advance(ParsingTable.BOOLEAN); + out.writeBoolean(b); + } + + @Override + public void writeInt(int n) throws IOException { + advance(ParsingTable.INT); + out.writeLong(n); + } + + @Override + public void writeLong(long n) throws IOException { + advance(ParsingTable.LONG); + out.writeLong(n); + } + + @Override + public void writeFloat(float f) throws IOException { + advance(ParsingTable.FLOAT); + out.writeFloat(f); + } + + @Override + public void writeDouble(double d) throws IOException { + advance(ParsingTable.DOUBLE); + out.writeDouble(d); + } + + @Override + public void writeString(Utf8 utf8) throws IOException { + advance(ParsingTable.STRING); + out.writeString(utf8); + } + + @Override + public void writeBytes(ByteBuffer bytes) throws IOException { + advance(ParsingTable.BYTES); + out.writeBytes(bytes); + } + + @Override + public void writeBytes(byte[] bytes, int start, int len) throws IOException { + advance(ParsingTable.BYTES); + out.writeBytes(bytes, start, len); + } + + @Override + public void writeFixed(byte[] bytes, int start, int length) + throws IOException { + advance(ParsingTable.FIXED); + int top = stack[--pos]; + assert table.isFixed(top); + if (table.getFixedSize(top) != length) { + throw new AvroTypeException( + "Incorrect length for fiexed binary: expected " + + table.getFixedSize(top) + " but received " + length + " bytes."); + } + out.writeFixed(bytes, start, length); + } + + @Override + public void writeEnum(int e) throws IOException { + advance(ParsingTable.ENUM); + int top = stack[--pos]; + assert table.isEnum(top); + if (e < 0 || e >= table.getEnumMax(top)) { + throw new AvroTypeException( + "Enumeration out of range: max is " + + table.getEnumMax(top) + " but received " + e); + } + out.writeEnum(e); + } + + @Override + public void writeArrayStart() throws IOException { + advance(ParsingTable.ARRAYSTART); + out.writeArrayStart(); + } + + @Override + public void setItemCount(long itemCount) throws IOException { + out.setItemCount(itemCount); + } + + @Override + public void startItem() throws IOException { + out.startItem(); + } + + @Override + public void writeArrayEnd() throws IOException { + advance(ParsingTable.ARRAYEND); + out.writeArrayEnd(); + } + + @Override + public void writeMapStart() throws IOException { + advance(ParsingTable.MAPSTART); + out.writeMapStart(); + } + + @Override + public void writeMapEnd() throws IOException { + advance(ParsingTable.MAPEND); + out.writeMapEnd(); + } + + @Override + public void writeIndex(int unionIndex) throws IOException { + advance(ParsingTable.UNION); + int top = stack[--pos]; + assert table.isUnion(top); + stack[pos++] = table.getBranch(top, unionIndex); + out.writeIndex(unionIndex); + } + + private int advance(int input) { + int top = stack[--pos]; + while (! table.isTerminal(top)) { + if (! table.isRepeater(top) + || (input != ParsingTable.ARRAYEND && input != ParsingTable.MAPEND)) { + int plen = table.size(top); + if (stack.length < pos + plen) { + stack = ValidatingDecoder.expand(stack, pos + plen); + } + System.arraycopy(table.prods, top, stack, pos, plen); + pos += plen; + } + top = stack[--pos]; + } + assert table.isTerminal(top); + if (top != input) { + throw new AvroTypeException("Attempt to write a " + + ParsingTable.getTerminalName(input) + " when a " + + ParsingTable.getTerminalName(top) + " was expected."); + } + return top; + } + + /** + * After writing a complete object that conforms to the schema or after an + * error, if you want to start writing another + * object, call this method. + */ + public void reset() { + /* + * Design note: Why is this a separate method? Why can't we auto reset + * in advance() when the stack becomes empty which is an indication that + * an object has been completed? + * + * (1) advance() is called in an inner loop. Saving cycles there is worthy. + * (2) The client may actually be writing something beyond the current + * object and the writer will think it has started writing the next object. + * This may lead to certain errors not being caught. + * (3) We need reset() anyway to take the writer out of any error state. + */ + pos = 0; + stack[pos++] = table.root; + } +} Propchange: hadoop/avro/trunk/src/java/org/apache/avro/io/doc-files/ ------------------------------------------------------------------------------ svn:mergeinfo =