Return-Path: Delivered-To: apmail-hadoop-avro-commits-archive@minotaur.apache.org Received: (qmail 28646 invoked from network); 22 Jul 2009 20:20:24 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 22 Jul 2009 20:20:24 -0000 Received: (qmail 80127 invoked by uid 500); 22 Jul 2009 20:21:29 -0000 Delivered-To: apmail-hadoop-avro-commits-archive@hadoop.apache.org Received: (qmail 80095 invoked by uid 500); 22 Jul 2009 20:21:29 -0000 Mailing-List: contact avro-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: avro-dev@hadoop.apache.org Delivered-To: mailing list avro-commits@hadoop.apache.org Received: (qmail 80085 invoked by uid 99); 22 Jul 2009 20:21:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2009 20:21:29 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jul 2009 20:21:17 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 24DC0238886C; Wed, 22 Jul 2009 20:20:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r796868 [1/2] - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/generic/ src/java/org/apache/avro/io/ src/java/org/apache/avro/io/doc-files/ src/java/org/apache/avro/io/parsing/ src/java/org/apache/avro/io/parsing/doc-files/ src/java/org... Date: Wed, 22 Jul 2009 20:20:56 -0000 To: avro-commits@hadoop.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090722202057.24DC0238886C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Wed Jul 22 20:20:55 2009 New Revision: 796868 URL: http://svn.apache.org/viewvc?rev=796868&view=rev Log: AVRO-50. Implement JSON data codec in Java. Contributed by Thiruvalluvan M.G. and cutting. Added: hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/JsonEncoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingDecoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingEncoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/ hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/JsonGrammarGenerator.java hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/Parser.java hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/SkipParser.java hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/Symbol.java hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/ValidatingGrammarGenerator.java hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/doc-files/ - copied from r796857, hadoop/avro/trunk/src/java/org/apache/avro/io/doc-files/ hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/package.html Removed: hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingTable.java hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingTable.java hadoop/avro/trunk/src/java/org/apache/avro/io/doc-files/ hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestParsingTable.java hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestResolvingTable.java Modified: hadoop/avro/trunk/CHANGES.txt hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java hadoop/avro/trunk/src/java/org/apache/avro/reflect/ReflectDatumWriter.java hadoop/avro/trunk/src/test/java/org/apache/avro/TestSchema.java hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestResolvingIO.java hadoop/avro/trunk/src/test/java/org/apache/avro/io/TestValidatingIO.java Modified: hadoop/avro/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=796868&r1=796867&r2=796868&view=diff ============================================================================== --- hadoop/avro/trunk/CHANGES.txt (original) +++ hadoop/avro/trunk/CHANGES.txt Wed Jul 22 20:20:55 2009 @@ -6,6 +6,9 @@ NEW FEATURES + AVRO-50. Implmenent JSON data codec in Java. (Thiruvalluvan + M. G. & cutting) + IMPROVEMENTS AVRO-71. C++: make deserializer more generic. (Scott Banachowski Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java?rev=796868&r1=796867&r2=796868&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumReader.java Wed Jul 22 20:20:55 2009 @@ -78,7 +78,7 @@ case FLOAT: return in.readFloat(); case DOUBLE: return in.readDouble(); case BOOLEAN: return in.readBoolean(); - case NULL: return null; + case NULL: in.readNull(); return null; default: throw new AvroRuntimeException("Unknown type: "+actual); } } @@ -410,8 +410,6 @@ * calls {@link ByteBuffer#wrap(byte[])}.*/ protected Object createBytes(byte[] value) { return ByteBuffer.wrap(value); } - private static final Schema STRING_SCHEMA = Schema.create(Type.STRING); - /** Skip an instance of a schema. */ public static void skip(Schema schema, Decoder in) throws IOException { switch (schema.getType()) { @@ -434,7 +432,7 @@ Schema value = schema.getValueType(); for (long l = in.skipMap(); l > 0; l = in.skipMap()) { for (long i = 0; i < l; i++) { - skip(STRING_SCHEMA, in); + in.skipString(); skip(value, in); } } @@ -446,6 +444,8 @@ in.skipFixed(schema.getFixedSize()); break; case STRING: + in.skipString(); + break; case BYTES: in.skipBytes(); break; Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java?rev=796868&r1=796867&r2=796868&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericDatumWriter.java Wed Jul 22 20:20:55 2009 @@ -68,7 +68,7 @@ case FLOAT: out.writeFloat((Float)datum); break; case DOUBLE: out.writeDouble((Double)datum); break; case BOOLEAN: out.writeBoolean((Boolean)datum); break; - case NULL: break; + case NULL: out.writeNull(); break; default: error(schema,datum); } } Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java?rev=796868&r1=796867&r2=796868&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/Decoder.java Wed Jul 22 20:20:55 2009 @@ -42,8 +42,9 @@ public abstract class Decoder { /** Start reading against a different input stream. Stateful - * subclasses will reset their states to their initial state. */ - public abstract void init(InputStream in); + * subclasses will reset their states to their initial state. + * @throws IOException */ + public abstract void init(InputStream in) throws IOException; /** * "Reads" a null value. (Doesn't actually read anything, but @@ -276,12 +277,4 @@ * union is not the type of the next value to be read */ public abstract int readIndex() throws IOException; - - /** - * After reading a complete object that conforms to the schema or after an - * error, if you want to start reading another object, call this method. - */ - - public void reset() throws IOException { - } } Added: hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java?rev=796868&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/JsonDecoder.java Wed Jul 22 20:20:55 2009 @@ -0,0 +1,397 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; + +import org.apache.avro.AvroTypeException; +import org.apache.avro.Schema; +import org.apache.avro.io.parsing.JsonGrammarGenerator; +import org.apache.avro.io.parsing.Parser; +import org.apache.avro.io.parsing.Symbol; +import org.apache.avro.util.Utf8; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +/** A {@link Decoder} for Avro's JSON data encoding. */ +public class JsonDecoder extends ParsingDecoder + implements Parser.ActionHandler { + private JsonParser in; + + static String CHARSET = "ISO-8859-1"; + + JsonDecoder(Symbol root, InputStream in) throws IOException { + super(root); + init(in); + } + + public JsonDecoder(Schema schema, InputStream in) throws IOException { + this(new JsonGrammarGenerator().generate(schema), in); + } + + @Override + public void init(InputStream in) throws IOException { + parser.reset(); + this.in = new JsonFactory().createJsonParser(in); + this.in.nextToken(); + } + + @Override + public void readNull() throws IOException { + parser.advance(Symbol.NULL); + if (in.getCurrentToken() == JsonToken.VALUE_NULL) { + in.nextToken(); + } else { + throw error("null"); + } + } + + @Override + public boolean readBoolean() throws IOException { + parser.advance(Symbol.BOOLEAN); + JsonToken t = in.getCurrentToken(); + if (t == JsonToken.VALUE_TRUE || t == JsonToken.VALUE_FALSE) { + in.nextToken(); + return t == JsonToken.VALUE_TRUE; + } else { + throw error("boolean"); + } + } + + @Override + public int readInt() throws IOException { + parser.advance(Symbol.INT); + if (in.getCurrentToken() == JsonToken.VALUE_NUMBER_INT) { + int result = in.getIntValue(); + in.nextToken(); + return result; + } else { + throw error("int"); + } + } + + @Override + public long readLong() throws IOException { + parser.advance(Symbol.LONG); + if (in.getCurrentToken() == JsonToken.VALUE_NUMBER_INT) { + long result = in.getLongValue(); + in.nextToken(); + return result; + } else { + throw error("long"); + } + } + + @Override + public float readFloat() throws IOException { + parser.advance(Symbol.FLOAT); + if (in.getCurrentToken() == JsonToken.VALUE_NUMBER_FLOAT) { + float result = in.getFloatValue(); + in.nextToken(); + return result; + } else { + throw error("float"); + } + } + + @Override + public double readDouble() throws IOException { + parser.advance(Symbol.DOUBLE); + if (in.getCurrentToken() == JsonToken.VALUE_NUMBER_FLOAT) { + double result = in.getDoubleValue(); + in.nextToken(); + return result; + } else { + throw error("double"); + } + } + + @Override + public Utf8 readString(Utf8 old) throws IOException { + parser.advance(Symbol.STRING); + if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) { + parser.advance(Symbol.MAP_KEY_MARKER); + if (in.getCurrentToken() != JsonToken.FIELD_NAME) { + throw error("map-key"); + } + } else { + if (in.getCurrentToken() != JsonToken.VALUE_STRING) { + throw error("string"); + } + } + String result = in.getText(); + in.nextToken(); + return new Utf8(result); + } + + @Override + public void skipString() throws IOException { + parser.advance(Symbol.STRING); + if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) { + parser.advance(Symbol.MAP_KEY_MARKER); + if (in.getCurrentToken() != JsonToken.FIELD_NAME) { + throw error("map-key"); + } + } else { + if (in.getCurrentToken() != JsonToken.VALUE_STRING) { + throw error("string"); + } + } + in.nextToken(); + } + + @Override + public ByteBuffer readBytes(ByteBuffer old) throws IOException { + parser.advance(Symbol.BYTES); + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + byte[] result = readByteArray(); + in.nextToken(); + return ByteBuffer.wrap(result); + } else { + throw error("bytes"); + } + } + + private byte[] readByteArray() throws UnsupportedEncodingException, + IOException, JsonParseException { + byte[] result = in.getText().getBytes(CHARSET); + return result; + } + + @Override + public void skipBytes() throws IOException { + parser.advance(Symbol.BYTES); + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + in.nextToken(); + } else { + throw error("bytes"); + } + } + + private void checkFixed(int size) throws IOException { + parser.advance(Symbol.FIXED); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); + if (size != top.size) { + throw new AvroTypeException( + "Incorrect length for fixed binary: expected " + + top.size + " but received " + size + " bytes."); + } + } + + @Override + public void readFixed(byte[] bytes, int start, int len) throws IOException { + checkFixed(len); + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + byte[] result = readByteArray(); + in.nextToken(); + if (result.length != len) { + throw new AvroTypeException("Expected fixed length " + len + + ", but got" + result.length); + } + System.arraycopy(result, 0, bytes, start, len); + } else { + throw error("fixed"); + } + } + + @Override + public void skipFixed(int length) throws IOException { + checkFixed(length); + doSkipFixed(length); + } + + private void doSkipFixed(int length) throws IOException, JsonParseException { + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + byte[] result = readByteArray(); + in.nextToken(); + if (result.length != length) { + throw new AvroTypeException("Expected fixed length " + length + + ", but got" + result.length); + } + } else { + throw error("fixed"); + } + } + + @Override + protected void skipFixed() throws IOException { + parser.advance(Symbol.FIXED); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); + doSkipFixed(top.size); + } + + @Override + public int readEnum() throws IOException { + parser.advance(Symbol.ENUM); + Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol(); + if (in.getCurrentToken() == JsonToken.VALUE_STRING) { + in.getText(); + int n = top.findLabel(in.getText()); + if (n >= 0) { + in.nextToken(); + return n; + } + throw new AvroTypeException("Unknown symbol in enum " + in.getText()); + } else { + throw error("fixed"); + } + } + + @Override + public long readArrayStart() throws IOException { + parser.advance(Symbol.ARRAY_START); + if (in.getCurrentToken() == JsonToken.START_ARRAY) { + in.nextToken(); + return doArrayNext(); + } else { + throw error("array-start"); + } + } + + @Override + public long arrayNext() throws IOException { + parser.advance(Symbol.ITEM_END); + return doArrayNext(); + } + + private long doArrayNext() throws IOException, JsonParseException { + if (in.getCurrentToken() == JsonToken.END_ARRAY) { + parser.advance(Symbol.ARRAY_END); + in.nextToken(); + return 0; + } else { + return 1; + } + } + + @Override + public long skipArray() throws IOException { + parser.advance(Symbol.ARRAY_START); + if (in.getCurrentToken() == JsonToken.START_ARRAY) { + in.skipChildren(); + in.nextToken(); + parser.advance(Symbol.ARRAY_END); + } else { + throw error("array-start"); + } + return 0; + } + + @Override + public long readMapStart() throws IOException { + parser.advance(Symbol.MAP_START); + if (in.getCurrentToken() == JsonToken.START_OBJECT) { + in.nextToken(); + return doMapNext(); + } else { + throw error("map-start"); + } + } + + @Override + public long mapNext() throws IOException { + parser.advance(Symbol.ITEM_END); + return doMapNext(); + } + + private long doMapNext() throws IOException, JsonParseException { + if (in.getCurrentToken() == JsonToken.END_OBJECT) { + in.nextToken(); + parser.advance(Symbol.MAP_END); + return 0; + } else { + return 1; + } + } + + @Override + public long skipMap() throws IOException { + parser.advance(Symbol.MAP_START); + if (in.getCurrentToken() == JsonToken.START_OBJECT) { + in.skipChildren(); + in.nextToken(); + parser.advance(Symbol.MAP_END); + } else { + throw error("map-start"); + } + return 0; + } + + @Override + public int readIndex() throws IOException { + parser.advance(Symbol.UNION); + Symbol.Alternative a = (Symbol.Alternative) parser.popSymbol(); + if (in.getCurrentToken() == JsonToken.START_OBJECT && + in.nextToken() == JsonToken.FIELD_NAME) { + String label = in.getText(); + in.nextToken(); + int n = a.findLabel(label); + if (n < 0) { + throw new AvroTypeException("Unknown union branch " + label); + } + parser.pushSymbol(Symbol.UNION_END); + parser.pushSymbol(a.getSymbol(n)); + return n; + } else { + throw error("start-union"); + } + } + + public Symbol doAction(Symbol input, Symbol top) throws IOException { + if (top instanceof Symbol.FieldAdjustAction) { + Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top; + if (in.getCurrentToken() == JsonToken.FIELD_NAME) { + String fn = in.getCurrentName(); + if (fa.fname.equals(fn)) { + in.nextToken(); + return Symbol.CONTINUE; + } else { + throw new AvroTypeException("Expected field name " + fa.fname + + " got " + in.getCurrentName()); + } + } + } else if (top == Symbol.RECORD_START) { + if (in.getCurrentToken() == JsonToken.START_OBJECT) { + in.nextToken(); + } else { + throw error("record-start"); + } + } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) { + if (in.getCurrentToken() == JsonToken.END_OBJECT) { + in.nextToken(); + } else { + throw error(top == Symbol.RECORD_END ? "record-end" : "union-end"); + } + } else { + throw new AvroTypeException("Unknown action symbol " + top); + } + return Symbol.CONTINUE; + } + + private AvroTypeException error(String type) { + return new AvroTypeException("Expected " + type + + ". Got " + in.getCurrentToken()); + } + + public void setItemCount(long itemCount) throws IOException { + } +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/JsonEncoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/JsonEncoder.java?rev=796868&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/JsonEncoder.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/JsonEncoder.java Wed Jul 22 20:20:55 2009 @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.BitSet; + +import org.apache.avro.AvroTypeException; +import org.apache.avro.Schema; +import org.apache.avro.io.parsing.JsonGrammarGenerator; +import org.apache.avro.io.parsing.Parser; +import org.apache.avro.io.parsing.Symbol; +import org.apache.avro.util.Utf8; +import org.codehaus.jackson.JsonEncoding; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonGenerator; + +/** An {@link Encoder} for Avro's JSON data encoding. */ +public class JsonEncoder extends ParsingEncoder implements Parser.ActionHandler { + final Parser parser; + private JsonGenerator out; + /** + * Has anything been written into the collections? + */ + protected BitSet isEmpty = new BitSet(); + + public JsonEncoder(Schema sc, OutputStream out) throws IOException { + this(sc, new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8)); + } + + public JsonEncoder(Schema sc, JsonGenerator out) throws IOException { + this.out = out; + this.parser = + new Parser(new JsonGrammarGenerator().generate(sc), this); + } + + @Override + public void flush() throws IOException { + if (parser.depth() > 1) { + parser.advance(Symbol.END); + } + out.flush(); + } + + @Override + public void init(OutputStream out) throws IOException { + if (this.out != null) { + flush(); + } + this.out = new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8); + } + + @Override + public void writeNull() throws IOException { + parser.advance(Symbol.NULL); + out.writeNull(); + } + + @Override + public void writeBoolean(boolean b) throws IOException { + parser.advance(Symbol.BOOLEAN); + out.writeBoolean(b); + } + + @Override + public void writeInt(int n) throws IOException { + parser.advance(Symbol.INT); + out.writeNumber(n); + } + + @Override + public void writeLong(long n) throws IOException { + parser.advance(Symbol.LONG); + out.writeNumber(n); + } + + @Override + public void writeFloat(float f) throws IOException { + parser.advance(Symbol.FLOAT); + out.writeNumber(f); + } + + @Override + public void writeDouble(double d) throws IOException { + parser.advance(Symbol.DOUBLE); + out.writeNumber(d); + } + + @Override + public void writeString(Utf8 utf8) throws IOException { + parser.advance(Symbol.STRING); + if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) { + parser.advance(Symbol.MAP_KEY_MARKER); + out.writeFieldName(utf8.toString()); + } else { + out.writeString(utf8.toString()); + } + } + + @Override + public void writeBytes(ByteBuffer bytes) throws IOException { + if (bytes.hasArray()) { + writeBytes(bytes.array(), bytes.position(), bytes.remaining()); + } else { + byte[] b = new byte[bytes.remaining()]; + for (int i = 0; i < b.length; i++) { + b[i] = bytes.get(); + } + writeBytes(b); + } + } + + @Override + public void writeBytes(byte[] bytes, int start, int len) throws IOException { + parser.advance(Symbol.BYTES); + writeByteArray(bytes, start, len); + } + + private void writeByteArray(byte[] bytes, int start, int len) + throws IOException, JsonGenerationException, UnsupportedEncodingException { + out.writeString( + new String(bytes, start, len, JsonDecoder.CHARSET)); + } + + @Override + public void writeFixed(byte[] bytes, int start, int len) throws IOException { + parser.advance(Symbol.FIXED); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); + if (len != top.size) { + throw new AvroTypeException( + "Incorrect length for fixed binary: expected " + + top.size + " but received " + len + " bytes."); + } + writeByteArray(bytes, start, len); + } + + @Override + public void writeEnum(int e) throws IOException { + parser.advance(Symbol.ENUM); + Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol(); + if (e < 0 || e >= top.size) { + throw new AvroTypeException( + "Enumeration out of range: max is " + + top.size + " but received " + e); + } + out.writeString(top.getLabel(e)); + } + + @Override + public void writeArrayStart() throws IOException { + parser.advance(Symbol.ARRAY_START); + out.writeStartArray(); + push(); + isEmpty.set(depth()); + } + + @Override + public void writeArrayEnd() throws IOException { + if (! isEmpty.get(pos)) { + parser.advance(Symbol.ITEM_END); + } + pop(); + parser.advance(Symbol.ARRAY_END); + out.writeEndArray(); + } + + @Override + public void writeMapStart() throws IOException { + push(); + isEmpty.set(depth()); + + parser.advance(Symbol.MAP_START); + out.writeStartObject(); + } + + @Override + public void writeMapEnd() throws IOException { + if (! isEmpty.get(pos)) { + parser.advance(Symbol.ITEM_END); + } + pop(); + + parser.advance(Symbol.MAP_END); + out.writeEndObject(); + } + + @Override + public void startItem() throws IOException { + if (! isEmpty.get(pos)) { + parser.advance(Symbol.ITEM_END); + } + super.startItem(); + isEmpty.clear(depth()); + } + + @Override + public void writeIndex(int unionIndex) throws IOException { + parser.advance(Symbol.UNION); + Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol(); + out.writeStartObject(); + out.writeFieldName(top.getLabel(unionIndex)); + parser.pushSymbol(Symbol.UNION_END); + parser.pushSymbol(top.getSymbol(unionIndex)); + } + + @Override + public Symbol doAction(Symbol input, Symbol top) throws IOException { + if (top instanceof Symbol.FieldAdjustAction) { + Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top; + out.writeFieldName(fa.fname); + } else if (top == Symbol.RECORD_START) { + out.writeStartObject(); + } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) { + out.writeEndObject(); + } else { + throw new AvroTypeException("Unknown action symbol " + top); + } + return Symbol.CONTINUE; + } +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingDecoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingDecoder.java?rev=796868&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingDecoder.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingDecoder.java Wed Jul 22 20:20:55 2009 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import java.io.IOException; + +import org.apache.avro.io.parsing.SkipParser; +import org.apache.avro.io.parsing.Symbol; +import org.apache.avro.io.parsing.Parser.ActionHandler; +import org.apache.avro.io.parsing.SkipParser.SkipHandler; + +/** Base class for parser-based + * {@link Decoder}s. */ +public abstract class ParsingDecoder extends Decoder + implements ActionHandler, SkipHandler { + protected final SkipParser parser; + + public ParsingDecoder(Symbol root) throws IOException { + this.parser = new SkipParser(root, this, this); + } + + protected abstract void skipFixed() throws IOException; + + public void skipAction() throws IOException { + parser.popSymbol(); + } + + public void skipTopSymbol() throws IOException { + Symbol top = parser.topSymbol(); + if (top == Symbol.NULL) { + readNull(); + } if (top == Symbol.BOOLEAN) { + readBoolean(); + } else if (top == Symbol.INT) { + readInt(); + } else if (top == Symbol.LONG) { + readLong(); + } else if (top == Symbol.FLOAT) { + readFloat(); + } else if (top == Symbol.DOUBLE) { + readDouble(); + } else if (top == Symbol.STRING) { + skipString(); + } else if (top == Symbol.BYTES) { + skipBytes(); + } else if (top == Symbol.ENUM) { + readEnum(); + } else if (top == Symbol.FIXED) { + skipFixed(); + } else if (top == Symbol.UNION) { + readIndex(); + } else if (top == Symbol.ARRAY_START) { + skipArray(); + } else if (top == Symbol.MAP_START) { + skipMap(); + } + } + +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingEncoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingEncoder.java?rev=796868&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingEncoder.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ParsingEncoder.java Wed Jul 22 20:20:55 2009 @@ -0,0 +1,52 @@ +package org.apache.avro.io; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.avro.AvroTypeException; + +/** Base class for parser-based + * {@link Encoder}s. */ +public abstract class ParsingEncoder extends Encoder { + /** + * Tracks the number of items that remain to be written in + * the collections (array or map). + */ + private long[] counts = new long[10]; + + protected int pos = -1; + + @Override + public void setItemCount(long itemCount) throws IOException { + if (counts[pos] != 0) { + throw new AvroTypeException("Incorrect number of items written. " + + counts[pos] + " more required."); + } + counts[pos] = itemCount; + } + + @Override + public void startItem() throws IOException { + counts[pos]--; + } + + /** Push a new collection on to the stack. */ + protected final void push() { + if (pos == counts.length) { + counts = Arrays.copyOf(counts, pos + 10); + } + counts[++pos] = 0; + } + + protected final void pop() { + if (counts[pos] != 0) { + throw new AvroTypeException("Incorrect number of items written. " + + counts[pos] + " more required."); + } + pos--; + } + + protected final int depth() { + return pos; + } +} Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java?rev=796868&r1=796867&r2=796868&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ResolvingDecoder.java Wed Jul 22 20:20:55 2009 @@ -17,10 +17,13 @@ */ package org.apache.avro.io; +import java.io.ByteArrayInputStream; import java.io.IOException; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; +import org.apache.avro.io.parsing.ResolvingGrammarGenerator; +import org.apache.avro.io.parsing.Symbol; /** * {@link Decoder} that peforms type-resolution between the reader's and @@ -38,12 +41,12 @@ *

See the parser documentation for * information on how this works.*/ public class ResolvingDecoder extends ValidatingDecoder { - private ResolvingTable rtable; + private Decoder backup; + public ResolvingDecoder(Schema writer, Schema reader, Decoder in) throws IOException { - super(new ResolvingTable(writer, reader), in); - rtable = (ResolvingTable) table; + super(new ResolvingGrammarGenerator().generate(writer, reader), in); } /** Returns the name of the next field of the record we're reading. @@ -53,8 +56,8 @@ * @throws IllegalStateExcpetion If we're not about to read a record-field */ public String readFieldName() throws IOException { - int actual = advance(ResolvingTable.FIELDACTION); - return rtable.getFieldName(actual); + return ((Symbol.FieldAdjustAction) parser.advance(Symbol.FIELD_ACTION)). + fname; } /** Returns the (zero-based) index of the next field of the record @@ -98,126 +101,103 @@ * */ public int readFieldIndex() throws IOException { - int actual = advance(ResolvingTable.FIELDACTION); - return rtable.getFieldIndex(actual); + return ((Symbol.FieldAdjustAction) parser.advance(Symbol.FIELD_ACTION)). + rindex; } @Override public long readLong() throws IOException { - int actual = advance(ResolvingTable.LONG); - if (actual == ResolvingTable.INT) { + Symbol actual = parser.advance(Symbol.LONG); + if (actual == Symbol.INT) { return in.readInt(); - } else if (actual == ResolvingTable.DOUBLE) { + } else if (actual == Symbol.DOUBLE) { return (long) in.readDouble(); } else { - assert actual == ResolvingTable.LONG; + assert actual == Symbol.LONG; return in.readLong(); } } - + @Override public double readDouble() throws IOException { - int actual = advance(ResolvingTable.DOUBLE); - if (actual == ResolvingTable.INT) { + Symbol actual = parser.advance(Symbol.DOUBLE); + if (actual == Symbol.INT) { return (double) in.readInt(); - } else if (actual == ResolvingTable.LONG) { + } else if (actual == Symbol.LONG) { return (double) in.readLong(); - } else if (actual == ResolvingTable.FLOAT) { + } else if (actual == Symbol.FLOAT) { return (double) in.readFloat(); } else { - assert actual == ResolvingTable.DOUBLE; + assert actual == Symbol.DOUBLE; return in.readDouble(); } } @Override public int readEnum() throws IOException { - advance(ResolvingTable.ENUM); - int top = stack[--pos]; + parser.advance(Symbol.ENUM); + Symbol.EnumAdjustAction top = (Symbol.EnumAdjustAction) parser.popSymbol(); int n = in.readEnum(); - if (n >= 0 && n < rtable.size(top)) { - n = rtable.getEnumAction(top, n); - if (rtable.isEnumAction(n)) { - return rtable.getEnumValue(n); - } else { - assert rtable.isErrorAction(n); - throw new AvroTypeException(rtable.getMessage(n)); - } + Object o = top.adjustments[n]; + if (o instanceof Integer) { + return ((Integer) o).intValue(); } else { - throw new AvroTypeException("Enumeration out of range: " + n - + " max: " + rtable.size(top)); + throw new AvroTypeException((String) o); } } @Override public int readIndex() throws IOException { - advance(ParsingTable.UNION); - int actual = stack[--pos]; - if (rtable.isUnion(actual)) { - actual = rtable.getBranch(actual, in.readInt()); - } - if (rtable.isReaderUnionAction(actual)) { - // Both reader and writer where a union. Based on - // the writer's actual branch, go get the appropriate - // readerUnionAction - stack[pos++] = rtable.getReaderUnionSym(actual); - return rtable.getReaderUnionIndex(actual); - } else { - throw new AvroTypeException("Unexpected index read"); - } + parser.advance(Symbol.UNION); + Symbol.UnionAdjustAction top = (Symbol.UnionAdjustAction) parser.popSymbol(); + parser.pushSymbol(top.symToParse); + return top.rindex; } @Override - protected int skipSymbol(int sym, int p) throws IOException { - if (rtable.isResolverAction(sym)) { - return skipSymbol(rtable.getResolverActual(sym), -1); + public Symbol doAction(Symbol input, Symbol top) throws IOException { + if (top instanceof Symbol.FieldAdjustAction) { + return input == Symbol.FIELD_ACTION ? top : Symbol.CONTINUE; + } if (top instanceof Symbol.ResolvingAction) { + Symbol.ResolvingAction t = (Symbol.ResolvingAction) top; + if (t.reader != input) { + throw new AvroTypeException("Found " + t.reader + " while looking for " + + input); + } else { + return t.writer; + } + } else if (top instanceof Symbol.SkipAction) { + Symbol symToSkip = ((Symbol.SkipAction) top).symToSkip; + parser.skipSymbol(symToSkip); + } else if (top instanceof Symbol.WriterUnionAction) { + Symbol.Alternative branches = (Symbol.Alternative) parser.popSymbol(); + parser.pushSymbol(branches.getSymbol(in.readIndex())); + } else if (top instanceof Symbol.ErrorAction) { + throw new AvroTypeException(((Symbol.ErrorAction) top).msg); + } else if (top instanceof Symbol.DefaultStartAction) { + Symbol.DefaultStartAction dsa = (Symbol.DefaultStartAction) top; + backup = in; + in = (new JsonDecoder(dsa.root, new ByteArrayInputStream(dsa.contents))); + } else if (top == Symbol.DEFAULT_END_ACTION) { + in = backup; } else { - return super.skipSymbol(sym, p); + throw new AvroTypeException("Unknown action: " + top); } + return Symbol.CONTINUE; } @Override - protected int advance(int input) throws IOException { - int top = stack[--pos]; - while (! rtable.isTerminal(top)) { - if (rtable.isAction(top)) { - if (rtable.isFieldAction(top)) { - if (input == ResolvingTable.FIELDACTION) return top; - } else if (rtable.isResolverAction(top)) { - return rtable.getResolverActual(top); - } else if (rtable.isSkipAction(top)) { - skipSymbol(rtable.getProductionToSkip(top), -1); - } else if (rtable.isWriterUnionAction(top)) { - stack[pos++] = rtable.getBranch(top, in.readIndex()); - } else if (rtable.isErrorAction(top)) { - throw new AvroTypeException(rtable.getMessage(top)); - } - } else if (! rtable.isRepeater(top) - || (input != ParsingTable.ARRAYEND - && input != ParsingTable.MAPEND)) { - int plen = rtable.size(top); - if (stack.length < pos + plen) { - stack = expand(stack, pos + plen); - } - System.arraycopy(rtable.prods, top, stack, pos, plen); - pos += plen; - } - top = stack[--pos]; - } - if (top == input) { - return top; - } - throw new AvroTypeException("Attempt to read " + input - + " when a " + top + " was expected."); - } - public void reset() throws IOException { - while (pos > 0) { - if (rtable.isSkipAction(stack[pos - 1])) { - skipProduction(rtable.getProductionToSkip(stack[--pos])); - } else { - throw new AvroTypeException("Data not fully drained."); - } + public void skipAction() throws IOException { + Symbol top = parser.popSymbol(); + if (top instanceof Symbol.ResolvingAction) { + parser.pushSymbol(((Symbol.ResolvingAction) top).writer); + } else if (top instanceof Symbol.SkipAction) { + parser.pushSymbol(((Symbol.SkipAction) top).symToSkip); + } else if (top instanceof Symbol.WriterUnionAction) { + Symbol.Alternative branches = (Symbol.Alternative) parser.popSymbol(); + parser.pushSymbol(branches.getSymbol(in.readIndex())); + } else if (top instanceof Symbol.ErrorAction) { + throw new AvroTypeException(((Symbol.ErrorAction) top).msg); } - stack[pos++] = table.root; } } Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java?rev=796868&r1=796867&r2=796868&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingDecoder.java Wed Jul 22 20:20:55 2009 @@ -20,175 +20,175 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.Arrays; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; +import org.apache.avro.io.parsing.Parser; +import org.apache.avro.io.parsing.Symbol; +import org.apache.avro.io.parsing.ValidatingGrammarGenerator; import org.apache.avro.util.Utf8; -/** {@link Decoder} that ensures that the sequence of operations conforms - * to a schema. - *

See the parser documentation for - * information on how this works. +/** + * An implementation of {@link Decoder} that ensures that the sequence + * of operations conforms to a schema. */ -public class ValidatingDecoder extends Decoder { - protected final Decoder in; - protected int[] stack; - protected int pos; - ParsingTable table; +public class ValidatingDecoder extends ParsingDecoder + implements Parser.ActionHandler { + protected Decoder in; - ValidatingDecoder(ParsingTable table, Decoder in) - throws IOException { + ValidatingDecoder(Symbol root, Decoder in) throws IOException { + super(root); this.in = in; - this.stack = new int[5]; // Start small to make sure expansion code works - this.pos = 0; - this.table = table; - reset(); } - public ValidatingDecoder(Schema schema, Decoder in) - throws IOException { - this(new ParsingTable(schema), in); + public ValidatingDecoder(Schema schema, Decoder in) throws IOException { + this(new ValidatingGrammarGenerator().generate(schema), in); } @Override - public void init(InputStream in) { + public void init(InputStream in) throws IOException { + parser.reset(); this.in.init(in); } @Override public void readNull() throws IOException { - advance(ParsingTable.NULL); + parser.advance(Symbol.NULL); in.readNull(); } @Override public boolean readBoolean() throws IOException { - advance(ParsingTable.BOOLEAN); + parser.advance(Symbol.BOOLEAN); return in.readBoolean(); } @Override public int readInt() throws IOException { - advance(ParsingTable.INT); + parser.advance(Symbol.INT); return in.readInt(); } @Override public long readLong() throws IOException { - advance(ParsingTable.LONG); + parser.advance(Symbol.LONG); return in.readLong(); } @Override public float readFloat() throws IOException { - advance(ParsingTable.FLOAT); + parser.advance(Symbol.FLOAT); return in.readFloat(); } @Override public double readDouble() throws IOException { - advance(ParsingTable.DOUBLE); + parser.advance(Symbol.DOUBLE); return in.readDouble(); } @Override public Utf8 readString(Utf8 old) throws IOException { - advance(ParsingTable.STRING); + parser.advance(Symbol.STRING); return in.readString(old); } @Override public void skipString() throws IOException { - advance(ParsingTable.STRING); + parser.advance(Symbol.STRING); in.skipString(); } @Override public ByteBuffer readBytes(ByteBuffer old) throws IOException { - advance(ParsingTable.BYTES); + parser.advance(Symbol.BYTES); return in.readBytes(old); } @Override public void skipBytes() throws IOException { - advance(ParsingTable.BYTES); + parser.advance(Symbol.BYTES); in.skipBytes(); } - @Override - public void readFixed(byte[] bytes, int start, int length) throws IOException { - advance(ParsingTable.FIXED); - int top = stack[--pos]; - assert table.isFixed(top); - if (table.getFixedSize(top) != length) { + private void checkFixed(int size) throws IOException { + parser.advance(Symbol.FIXED); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); + if (size != top.size) { throw new AvroTypeException( - "Incorrect length for fiexed binary: expected " + - table.getFixedSize(top) + " but received " + length + " bytes."); + "Incorrect length for fixed binary: expected " + + top.size + " but received " + size + " bytes."); } - in.readFixed(bytes, start, length); + } + + @Override + public void readFixed(byte[] bytes, int start, int len) throws IOException { + checkFixed(len); + in.readFixed(bytes, start, len); } @Override public void skipFixed(int length) throws IOException { - advance(ParsingTable.FIXED); - int top = stack[--pos]; - assert table.isFixed(top); - if (table.getFixedSize(top) != length) { - throw new AvroTypeException( - "Incorrect length for fiexed binary: expected " + - table.getFixedSize(top) + " but received " + length + " bytes."); - } + checkFixed(length); in.skipFixed(length); } + protected void skipFixed() throws IOException { + parser.advance(Symbol.FIXED); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); + in.skipFixed(top.size); + } + @Override public int readEnum() throws IOException { - advance(ParsingTable.ENUM); - int top = stack[--pos]; - assert table.isEnum(top); + parser.advance(Symbol.ENUM); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); int result = in.readEnum(); - if (result < 0 || result >= table.getEnumMax(top)) { + if (result < 0 || result >= top.size) { throw new AvroTypeException( "Enumeration out of range: max is " + - table.getEnumMax(top) + " but received " + result); + top.size + " but received " + result); } return result; } @Override public long readArrayStart() throws IOException { - advance(ParsingTable.ARRAYSTART); + parser.advance(Symbol.ARRAY_START); long result = in.readArrayStart(); if (result == 0) { - advance(ParsingTable.ARRAYEND); + parser.advance(Symbol.ARRAY_END); } return result; } @Override public long arrayNext() throws IOException { - long result = in.readLong(); + long result = in.arrayNext(); if (result == 0) { - advance(ParsingTable.ARRAYEND); + parser.advance(Symbol.ARRAY_END); } return result; } @Override public long skipArray() throws IOException { - advance(ParsingTable.ARRAYSTART); - skipFullArray(stack[--pos]); - advance(ParsingTable.ARRAYEND); + parser.advance(Symbol.ARRAY_START); + for (long c = in.skipArray(); c != 0; c = in.skipArray()) { + while (c-- > 0) { + parser.skipRepeater(); + } + } + parser.advance(Symbol.ARRAY_END); return 0; } @Override public long readMapStart() throws IOException { - advance(ParsingTable.MAPSTART); + parser.advance(Symbol.MAP_START); long result = in.readMapStart(); if (result == 0) { - advance(ParsingTable.MAPEND); + parser.advance(Symbol.MAP_END); } return result; } @@ -197,162 +197,33 @@ public long mapNext() throws IOException { long result = in.mapNext(); if (result == 0) { - advance(ParsingTable.MAPEND); + parser.advance(Symbol.MAP_END); } return result; } @Override public long skipMap() throws IOException { - advance(ParsingTable.MAPSTART); - skipFullMap(stack[--pos]); - advance(ParsingTable.MAPEND); + parser.advance(Symbol.MAP_START); + for (long c = in.skipMap(); c != 0; c = in.skipMap()) { + while (c-- > 0) { + parser.skipRepeater(); + } + } + parser.advance(Symbol.MAP_END); return 0; } @Override public int readIndex() throws IOException { - advance(ParsingTable.UNION); - int top = stack[--pos]; - assert table.isUnion(top); + parser.advance(Symbol.UNION); + Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol(); int result = in.readIndex(); - stack[pos++] = table.getBranch(top, result); + parser.pushSymbol(top.getSymbol(result)); return result; } - - @Override - public void reset() throws IOException { - /* - * See the design note for ValidatingValueWriter.reset() on why such - * a method is needed. - */ - while (pos > 0) { - skipSymbol(stack[pos], -1); - } - stack[pos++] = table.root; - } - - /** Skip the values described by a production. */ - protected void skipProduction(int ntsym) throws IOException { - if (! table.isNonTerminal(ntsym) && ! table.isRepeater(ntsym)) { - throw new IllegalArgumentException("Can't skip a " + ntsym); - } - for (int i = table.size(ntsym) - 1; i >= 0; i--) { - int sym = table.prods[ntsym + i]; - if (table.isRepeater(sym)) - continue; // Don't recurse -- our caller will do that for us - - i = skipSymbol(table.prods[ntsym + i], ntsym + i) - ntsym; - } - } - - protected int skipSymbol(int sym, int p) throws IOException { - switch (sym) { - case ParsingTable.NULL: - in.readNull(); - break; - case ParsingTable.BOOLEAN: - in.readBoolean(); - break; - case ParsingTable.INT: - in.readInt(); - break; - case ParsingTable.LONG: - in.readLong(); - break; - case ParsingTable.FLOAT: - in.readFloat(); - break; - case ParsingTable.DOUBLE: - in.readDouble(); - break; - case ParsingTable.STRING: - in.skipString(); - break; - case ParsingTable.BYTES: - in.skipBytes(); - break; - case ParsingTable.FIXED: - in.skipFixed(table.getFixedSize(table.prods[--p])); - break; - case ParsingTable.UNION: - skipSymbol(table.getBranch(table.prods[--p], in.readIndex()), -1); - break; - case ParsingTable.ARRAYSTART: - while (p-- >= 0) { - int element = table.prods[p]; - if (table.isRepeater(element)) { - skipFullArray(element); - break; - } - } - while (table.prods[p] != ParsingTable.ARRAYEND) { - p--; // (skip action syms) - } - break; - case ParsingTable.MAPSTART: - while (p-- >= 0) { - int element = table.prods[p]; - if (table.isRepeater(element)) { - skipFullMap(element); - break; - } - } - while (table.prods[p] != ParsingTable.MAPEND) { - p--; // (skip action syms) - } - break; - default: // record - skipProduction(sym); - } - return p; - } - - private final void skipFullMap(int element) throws IOException { - for (long c = in.skipMap(); c != 0; c = in.skipMap()) { - skipElements(element, c); - } - } - - private final void skipElements(int element, long count) throws IOException { - while (count-- > 0) { - skipProduction(element); - } - } - - private final void skipFullArray(int element) throws IOException { - for (long c = in.skipArray(); c != 0; c = in.skipArray()) { - skipElements(element, c); - } - } - - protected int advance(int input) throws IOException { - int top = stack[--pos]; - while (! table.isTerminal(top)) { - if (! table.isRepeater(top) - || (input != ParsingTable.ARRAYEND && input != ParsingTable.MAPEND)) { - int plen = table.size(top); - if (stack.length < pos + plen) { - stack = expand(stack, pos + plen); - } - System.arraycopy(table.prods, top, stack, pos, plen); - pos += plen; - } - top = stack[--pos]; - } - assert table.isTerminal(top); - if (top != input) { - throw new AvroTypeException("Attempt to read a " - + ParsingTable.getTerminalName(input) + " when a " - + ParsingTable.getTerminalName(top) + " was expected."); - } - return top; - } - - protected static int[] expand(int[] stack, int len) { - while (stack.length < len) { - stack = Arrays.copyOf(stack, stack.length + Math.max(stack.length, 1024)); - } - return stack; + + public Symbol doAction(Symbol input, Symbol top) throws IOException { + return Symbol.CONTINUE; } } Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java?rev=796868&r1=796867&r2=796868&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/ValidatingEncoder.java Wed Jul 22 20:20:55 2009 @@ -15,39 +15,41 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.avro.io; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.BitSet; import org.apache.avro.AvroTypeException; import org.apache.avro.Schema; +import org.apache.avro.io.parsing.ValidatingGrammarGenerator; +import org.apache.avro.io.parsing.Parser; +import org.apache.avro.io.parsing.Symbol; import org.apache.avro.util.Utf8; -/** {@link Encoder} that ensures that the sequence of operations conforms - * to a schema. - * - *

See the parser documentation for - * information on how this works.*/ -public class ValidatingEncoder extends Encoder { - private final Encoder out; - protected int[] stack; - protected int pos; - protected ParsingTable table; +/** + * An implementation of {@link Encoder} that ensures that the sequence + * of operations conforms to a schema. + */ +public class ValidatingEncoder extends ParsingEncoder + implements Parser.ActionHandler { + protected Encoder out; + protected final Parser parser; + /** + * Has anything been written into the collections? + */ + protected BitSet isEmpty = new BitSet(); - public ValidatingEncoder(Schema schema, Encoder out) { + ValidatingEncoder(Symbol root, Encoder out) throws IOException { this.out = out; - this.stack = new int[5]; // Start small to make sure expansion code works - this.pos = 0; - this.table = new ParsingTable(schema); - stack[pos++] = table.root; + this.parser = new Parser(root, this); } - @Override - public void init(OutputStream out) throws IOException { - flush(); - this.out.init(out); + public ValidatingEncoder(Schema schema, Encoder in) throws IOException { + this(new ValidatingGrammarGenerator().generate(schema), in); } @Override @@ -56,170 +58,145 @@ } @Override + public void init(OutputStream out) throws IOException { + flush(); + parser.reset(); + this.out.init(out); + } + + @Override public void writeNull() throws IOException { - advance(ParsingTable.NULL); + parser.advance(Symbol.NULL); out.writeNull(); } - + @Override public void writeBoolean(boolean b) throws IOException { - advance(ParsingTable.BOOLEAN); + parser.advance(Symbol.BOOLEAN); out.writeBoolean(b); } @Override public void writeInt(int n) throws IOException { - advance(ParsingTable.INT); - out.writeLong(n); + parser.advance(Symbol.INT); + out.writeInt(n); } @Override public void writeLong(long n) throws IOException { - advance(ParsingTable.LONG); + parser.advance(Symbol.LONG); out.writeLong(n); } - + @Override public void writeFloat(float f) throws IOException { - advance(ParsingTable.FLOAT); + parser.advance(Symbol.FLOAT); out.writeFloat(f); } @Override public void writeDouble(double d) throws IOException { - advance(ParsingTable.DOUBLE); + parser.advance(Symbol.DOUBLE); out.writeDouble(d); } @Override public void writeString(Utf8 utf8) throws IOException { - advance(ParsingTable.STRING); + parser.advance(Symbol.STRING); out.writeString(utf8); } @Override public void writeBytes(ByteBuffer bytes) throws IOException { - advance(ParsingTable.BYTES); + parser.advance(Symbol.BYTES); out.writeBytes(bytes); } @Override public void writeBytes(byte[] bytes, int start, int len) throws IOException { - advance(ParsingTable.BYTES); + parser.advance(Symbol.BYTES); out.writeBytes(bytes, start, len); } @Override - public void writeFixed(byte[] bytes, int start, int length) - throws IOException { - advance(ParsingTable.FIXED); - int top = stack[--pos]; - assert table.isFixed(top); - if (table.getFixedSize(top) != length) { + public void writeFixed(byte[] bytes, int start, int len) throws IOException { + parser.advance(Symbol.FIXED); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); + if (len != top.size) { throw new AvroTypeException( - "Incorrect length for fiexed binary: expected " + - table.getFixedSize(top) + " but received " + length + " bytes."); + "Incorrect length for fixed binary: expected " + + top.size + " but received " + len + " bytes."); } - out.writeFixed(bytes, start, length); + out.writeFixed(bytes, start, len); } @Override public void writeEnum(int e) throws IOException { - advance(ParsingTable.ENUM); - int top = stack[--pos]; - assert table.isEnum(top); - if (e < 0 || e >= table.getEnumMax(top)) { + parser.advance(Symbol.ENUM); + Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol(); + if (e < 0 || e >= top.size) { throw new AvroTypeException( - "Enumeration out of range: max is " + - table.getEnumMax(top) + " but received " + e); + "Enumeration out of range: max is " + + top.size + " but received " + e); } out.writeEnum(e); } @Override public void writeArrayStart() throws IOException { - advance(ParsingTable.ARRAYSTART); + push(); + parser.advance(Symbol.ARRAY_START); out.writeArrayStart(); } @Override - public void setItemCount(long itemCount) throws IOException { - out.setItemCount(itemCount); - } - - @Override - public void startItem() throws IOException { - out.startItem(); - } - - @Override public void writeArrayEnd() throws IOException { - advance(ParsingTable.ARRAYEND); + parser.advance(Symbol.ARRAY_END); out.writeArrayEnd(); + pop(); } @Override public void writeMapStart() throws IOException { - advance(ParsingTable.MAPSTART); + push(); + parser.advance(Symbol.MAP_START); out.writeMapStart(); } @Override public void writeMapEnd() throws IOException { - advance(ParsingTable.MAPEND); + parser.advance(Symbol.MAP_END); out.writeMapEnd(); + pop(); + } + + @Override + public void setItemCount(long itemCount) throws IOException { + super.setItemCount(itemCount); + out.setItemCount(itemCount); + } + + @Override + public void startItem() throws IOException { + super.startItem(); + out.startItem(); } @Override public void writeIndex(int unionIndex) throws IOException { - advance(ParsingTable.UNION); - int top = stack[--pos]; - assert table.isUnion(top); - stack[pos++] = table.getBranch(top, unionIndex); + parser.advance(Symbol.UNION); + Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol(); + parser.pushSymbol(top.getSymbol(unionIndex)); out.writeIndex(unionIndex); } - private int advance(int input) { - int top = stack[--pos]; - while (! table.isTerminal(top)) { - if (! table.isRepeater(top) - || (input != ParsingTable.ARRAYEND && input != ParsingTable.MAPEND)) { - int plen = table.size(top); - if (stack.length < pos + plen) { - stack = ValidatingDecoder.expand(stack, pos + plen); - } - System.arraycopy(table.prods, top, stack, pos, plen); - pos += plen; - } - top = stack[--pos]; - } - assert table.isTerminal(top); - if (top != input) { - throw new AvroTypeException("Attempt to write a " + - ParsingTable.getTerminalName(input) + " when a " - + ParsingTable.getTerminalName(top) + " was expected."); - } - return top; + @Override + public Symbol doAction(Symbol input, Symbol top) throws IOException { + return Symbol.CONTINUE; } - /** - * After writing a complete object that conforms to the schema or after an - * error, if you want to start writing another - * object, call this method. - */ - public void reset() { - /* - * Design note: Why is this a separate method? Why can't we auto reset - * in advance() when the stack becomes empty which is an indication that - * an object has been completed? - * - * (1) advance() is called in an inner loop. Saving cycles there is worthy. - * (2) The client may actually be writing something beyond the current - * object and the writer will think it has started writing the next object. - * This may lead to certain errors not being caught. - * (3) We need reset() anyway to take the writer out of any error state. - */ - pos = 0; - stack[pos++] = table.root; + /** Have we written at least one item into the current collection? */ + protected final boolean isTopEmpty() { + return isEmpty.get(pos); } } Added: hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/JsonGrammarGenerator.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/JsonGrammarGenerator.java?rev=796868&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/JsonGrammarGenerator.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/JsonGrammarGenerator.java Wed Jul 22 20:20:55 2009 @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io.parsing; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; + +/** + * The class that generates a grammar suitable to parse Avro data + * in JSON format. + */ + +public class JsonGrammarGenerator extends ValidatingGrammarGenerator { + /** + * Returns the non-terminal that is the start symbol + * for the grammar for the grammar for the given schema sc. + */ + public Symbol generate(Schema schema) { + return Symbol.root(Symbol.END, + generate(schema, new HashMap())); + } + + /** + * Returns the non-terminal that is the start symbol + * for grammar of the given schema sc. If there is already an entry + * for the given schema in the given map seen then + * that entry is returned. Otherwise a new symbol is generated and + * an entry is inserted into the map. + * @param sc The schema for which the start symbol is required + * @param seen A map of schema to symbol mapping done so far. + * @return The start symbol for the schema + */ + public Symbol generate(Schema sc, Map seen) { + switch (sc.getType()) { + case NULL: + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case STRING: + case BYTES: + case FIXED: + case UNION: + return super.generate(sc, seen); + case ENUM: + return Symbol.seq(new Symbol.EnumLabelsAction(sc.getEnumSymbols()), + Symbol.ENUM); + case ARRAY: + return Symbol.seq(Symbol.ARRAY_END, + Symbol.repeat(Symbol.ARRAY_END, + Symbol.ITEM_END, generate(sc.getElementType(), seen)), + Symbol.ARRAY_START); + case MAP: + return Symbol.seq(Symbol.MAP_END, + Symbol.repeat(Symbol.MAP_END, + Symbol.ITEM_END, generate(sc.getValueType(), seen), + Symbol.MAP_KEY_MARKER, Symbol.STRING), + Symbol.MAP_START); + case RECORD: { + LitS wsc = new LitS(sc); + Symbol rresult = seen.get(wsc); + if (rresult == null) { + Symbol[] production = new Symbol[sc.getFields().size() * 2 + 2]; + rresult = Symbol.seq(production); + seen.put(wsc, rresult); + + int i = production.length; + int n = 0; + production[--i] = Symbol.RECORD_START; + for (Map.Entry f : sc.getFields().entrySet()) { + production[--i] = new Symbol.FieldAdjustAction(n, f.getKey()); + production[--i] = generate(f.getValue().schema(), seen); + n++; + } + production[--i] = Symbol.RECORD_END; + } + return rresult; + } + default: + throw new RuntimeException("Unexpected schema type"); + } + } +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/Parser.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/Parser.java?rev=796868&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/Parser.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/Parser.java Wed Jul 22 20:20:55 2009 @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io.parsing; + +import java.io.IOException; +import java.util.Arrays; + +import org.apache.avro.AvroTypeException; + +/** + * Parser is the class that maintains the stack for parsing. This class + * is used by encoders, which are not required to skip. + */ +public class Parser { + /** + * The parser knows how to handle the terminal and non-terminal + * symbols. But it needs help from outside to handle implicit + * and explicit actions. The clients implement this interface to + * provide this help. + */ + public interface ActionHandler { + public Symbol doAction(Symbol input, Symbol top) throws IOException; + } + + protected final ActionHandler symbolHandler; + protected Symbol[] stack; + protected int pos; + + public Parser(Symbol root, ActionHandler symbolHandler) + throws IOException { + this.symbolHandler = symbolHandler; + this.stack = new Symbol[5]; // Start small to make sure expansion code works + this.stack[0] = root; + this.pos = 1; + } + + /** + * If there is no sufficient room in the stack, use this expand it. + */ + private final void expandStack() { + stack = Arrays.copyOf(stack, stack.length+Math.max(stack.length,1024)); + } + + /** + * Recursively replaces the symbol at the top of the stack with its + * production, until the top is a terminal. Then checks if the + * top symbol matches the terminal symbol suppled terminal. + * @param input The symbol to match against the terminal at the + * top of the stack. + * @return The terminal symbol at the top of the stack unless an + * implicit action resulted in another symbol, in which case that + * symbol is returned. + */ + public final Symbol advance(Symbol input) throws IOException { + for (; ;) { + Symbol top = stack[--pos]; + if (top.kind == Symbol.Kind.TERMINAL) { + if (top == input) { + return top; // A common case + } else { + throw new AvroTypeException("Attempt to process a " + + input + " when a " + + top + " was expected."); + } + } else if (top.kind == Symbol.Kind.IMPLICIT_ACTION) { + Symbol result = symbolHandler.doAction(input, top); + if (result != Symbol.CONTINUE) { + return result; + } + } else { + pushProduction(input, top); + } + } + } + + /** + * Pushes the production for the given symbol sym. + * If sym is a repeater and input is either + * {@link Symbol#ARRAY_END} or {@link Symbol#MAP_END} pushes nothing. + * @param input + * @param sym + */ + public final void pushProduction(Symbol input, Symbol sym) { + if (sym.kind != Symbol.Kind.REPEATER || + input != ((Symbol.Repeater) sym).end) { + Symbol[] p = sym.production; + while (pos + p.length > stack.length) { + expandStack(); + } + System.arraycopy(p, 0, stack, pos, p.length); + pos += p.length; + } + } + + /** + * Pops and returns the top symbol from the stack. + */ + public Symbol popSymbol() { + return stack[--pos]; + } + + /** + * Returns the top symbol from the stack. + */ + public Symbol topSymbol() { + return stack[pos - 1]; + } + + /** + * Pushes sym on to the stack. + */ + public void pushSymbol(Symbol sym) { + if (pos == stack.length) { + expandStack(); + } + stack[pos++] = sym; + } + + /** + * Returns the depth of the stack. + */ + public int depth() { + return pos; + } + + public void reset() { + pos = 1; + } +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java?rev=796868&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/ResolvingGrammarGenerator.java Wed Jul 22 20:20:55 2009 @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io.parsing; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.avro.AvroTypeException; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; + +/** + * The class that generates a resolving grammar to resolve between two + * schemas. + */ +public class ResolvingGrammarGenerator extends ValidatingGrammarGenerator { + /** + * Resolves the writer schema writer and the reader schema + * reader and returns the start symbol for the grammar generated. + * @param writer The schema used by the writer + * @param reader The schema used by the reader + * @return The start symbol for the resolving grammar + * @throws IOException + */ + public final Symbol generate(Schema writer, Schema reader) + throws IOException { + return Symbol.root(generate(writer, reader, new HashMap())); + } + + /** + * Resolves the writer schema writer and the reader schema + * reader and returns the start symbol for the grammar generated. + * If there is already a symbol in the map seen for resolving the + * two schemas, then that symbol is returned. Otherwise a new symbol is + * generated and returnd. + * @param writer The schema used by the writer + * @param reader The schema used by the reader + * @param seen The <reader-schema, writer-schema> to symbol + * map of start symbols of resolving grammars so far. + * @return The start symbol for the resolving grammar + * @throws IOException + */ + public Symbol generate(Schema writer, Schema reader, + Map seen) throws IOException + { + final Schema.Type writerType = writer.getType(); + final Schema.Type readerType = reader.getType(); + + if (writerType == readerType) { + switch (writerType) { + case NULL: + return Symbol.NULL; + case BOOLEAN: + return Symbol.BOOLEAN; + case INT: + return Symbol.INT; + case LONG: + return Symbol.LONG; + case FLOAT: + return Symbol.FLOAT; + case DOUBLE: + return Symbol.DOUBLE; + case STRING: + return Symbol.STRING; + case BYTES: + return Symbol.BYTES; + case FIXED: + if (writer.getName().equals(reader.getName()) + && writer.getFixedSize() == reader.getFixedSize()) { + return Symbol.seq(new Symbol.IntCheckAction(writer.getFixedSize()), + Symbol.FIXED); + } + break; + + case ENUM: + if (writer.getName() == null + || writer.getName().equals(reader.getName())) { + return Symbol.seq(mkEnumAdjust(writer.getEnumSymbols(), + reader.getEnumSymbols()), Symbol.ENUM); + } + break; + + case ARRAY: + return Symbol.seq(Symbol.ARRAY_END, + Symbol.repeat(Symbol.ARRAY_END, + generate(writer.getElementType(), + reader.getElementType(), seen)), + Symbol.ARRAY_START); + + case MAP: + return Symbol.seq(Symbol.MAP_END, + Symbol.repeat(Symbol.MAP_END, + generate(writer.getValueType(), + reader.getValueType(), seen), Symbol.STRING), + Symbol.MAP_START); + case RECORD: + return resolveRecords(writer, reader, seen); + case UNION: + return resolveUnion(writer, reader, seen); + default: + throw new AvroTypeException("Unkown type for schema: " + writerType); + } + } else { // writer and reader are of different types + if (writerType == Schema.Type.UNION) { + return resolveUnion(writer, reader, seen); + } + + switch (readerType) { + case LONG: + switch (writerType) { + case INT: + case DOUBLE: + case FLOAT: + return Symbol.resolve(super.generate(writer, seen), Symbol.LONG); + } + break; + + case DOUBLE: + switch (writerType) { + case INT: + case LONG: + case FLOAT: + return Symbol.resolve(super.generate(writer, seen), Symbol.DOUBLE); + } + break; + + case UNION: + int j = bestBranch(reader, writer); + if (j >= 0) { + Symbol s = generate(writer, reader.getTypes().get(j), seen); + return Symbol.seq(new Symbol.UnionAdjustAction(j, s), Symbol.UNION); + } + break; + case NULL: + case BOOLEAN: + case INT: + case STRING: + case FLOAT: + case BYTES: + case ENUM: + case ARRAY: + case MAP: + case RECORD: + break; + default: + throw new RuntimeException("Unexpected schema type: " + readerType); + } + } + return Symbol.error("Found " + writer + ", expecting " + reader); + } + + private Symbol resolveUnion(Schema writer, Schema reader, + Map seen) throws IOException { + List alts = writer.getTypes(); + final int size = alts.size(); + Symbol[] symbols = new Symbol[size]; + String[] labels = new String[size]; + + /** + * We construct a symbol without filling the arrays. Please see + * {@link Symbol#production} for the reason. + */ + int i = 0; + for (Schema w : alts) { + symbols[i] = generate(w, reader, seen); + labels[i] = w.getType().name(); + i++; + } + return Symbol.seq(Symbol.alt(symbols, labels), + new Symbol.WriterUnionAction()); + } + + private Symbol resolveRecords(Schema writer, Schema reader, + Map seen) throws IOException { + LitS wsc = new LitS2(writer, reader); + Symbol result = seen.get(wsc); + if (result == null) { + // first count the number of entries in the production; + int count = 0; + + Map wfields = writer.getFields(); + Map rfields = reader.getFields(); + for (String fn : wfields.keySet()) { + count += (rfields.get(fn) == null) ? 1 : 2; + } + + for (Map.Entry rfe : rfields.entrySet()) { + String fname = rfe.getKey(); + if (wfields.get(fname) == null) { + Field rf = rfe.getValue(); + if (rf.defaultValue() == null) { + result = Symbol.error("Found " + writer + ", expecting " + reader); + seen.put(wsc, result); + return result; + } else { + count += 4; + } + } + } + + Symbol[] production = new Symbol[count]; + + /** + * We construct a symbol without filling the array. Please see + * {@link Symbol#production} for the reason. + */ + result = Symbol.seq(production); + seen.put(wsc, result); + + /* + * For now every field in read-record with no default value + * must be in write-record. + * Write record may have additional fields, which will be + * skipped during read. + */ + + // Handle all the writer's fields + for (Map.Entry wfe : wfields.entrySet()) { + String fname = wfe.getKey(); + Field rf = rfields.get(fname); + if (rf == null) { + production[--count] = + new Symbol.SkipAction(super.generate(wfe.getValue().schema(), + seen)); + } else { + production[--count] = + new Symbol.FieldAdjustAction(rf.pos(), fname); + production[--count] = + generate(wfe.getValue().schema(), rf.schema(), seen); + } + } + + // Add default values for fields missing from Writer + for (Map.Entry rfe : rfields.entrySet()) { + String fname = rfe.getKey(); + Field wf = wfields.get(fname); + if (wf == null) { + Field rf = rfe.getValue(); + production[--count] = new Symbol.FieldAdjustAction(rf.pos(), fname); + production[--count] = new Symbol.DefaultStartAction( + new JsonGrammarGenerator().generate(rf.schema()), + rf.defaultValue()); + production[--count] = super.generate(rf.schema(), seen); + production[--count] = Symbol.DEFAULT_END_ACTION; + } + } + } + return result; + } + + private static Symbol mkEnumAdjust(List rsymbols, + List wsymbols){ + Object[] adjustments = new Object[wsymbols.size()]; + for (int i = 0; i < adjustments.length; i++) { + int j = rsymbols.indexOf(wsymbols.get(i)); + adjustments[i] = (j == -1 ? "No match for " + wsymbols.get(i) + : new Integer(j)); + } + return new Symbol.EnumAdjustAction(rsymbols.size(), adjustments); + } + + private static int bestBranch(Schema r, Schema w) { + Schema.Type vt = w.getType(); + // first scan for exact match + int j = 0; + for (Schema b : r.getTypes()) { + if (vt == b.getType()) + if (vt == Schema.Type.RECORD) { + String vname = w.getName(); + if (vname == null || vname.equals(b.getName())) + return j; + } else + return j; + j++; + } + + // then scan match via numeric promotion + j = 0; + for (Schema b : r.getTypes()) { + switch (vt) { + case INT: + switch (b.getType()) { + case LONG: case DOUBLE: + return j; + } + break; + case LONG: + case FLOAT: + switch (b.getType()) { + case DOUBLE: + return j; + } + break; + } + j++; + } + return -1; + } + + /** + * Clever trick which differentiates items put into + * seen by {@link ValidatingGrammarGenerator#validating validating()} + * from those put in by {@link ValidatingGrammarGenerator#resolving resolving()}. + */ + static class LitS2 extends ValidatingGrammarGenerator.LitS { + public Schema expected; + public LitS2(Schema actual, Schema expected) { + super(actual); + this.expected = expected; + } + public boolean equals(Object o) { + if (! (o instanceof LitS2)) return false; + LitS2 other = (LitS2) o; + return actual == other.actual && expected == other.expected; + } + public int hashCode() { + return super.hashCode() + expected.hashCode(); + } + } +} Added: hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/SkipParser.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/SkipParser.java?rev=796868&view=auto ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/SkipParser.java (added) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/parsing/SkipParser.java Wed Jul 22 20:20:55 2009 @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io.parsing; + +import java.io.IOException; + +/** + * A parser that capable of skipping as well read and write. This class is + * used by decoders who (unlink encoders) are required to implement + * methods to skip. + */ +public class SkipParser extends Parser { + /** + * The clients implement this interface to skip symbols and actions. + */ + public interface SkipHandler { + /** + * Skips the action at the top of the stack. + */ + public void skipAction() throws IOException; + + /** + * Skips the symbol at the top of the stack. + */ + public void skipTopSymbol() throws IOException; + } + + private final SkipHandler skipHandler; + + public SkipParser(Symbol root, ActionHandler symbolHandler, + SkipHandler skipHandler) + throws IOException { + super(root, symbolHandler); + this.skipHandler = skipHandler; + } + + /** + * Skips data by calling skipXyz or + * readXyz methods on this, until the + * parser stack reaches the target level. + */ + public final void skipTo(int target) throws IOException { + while (target < pos) { + Symbol top = stack[pos - 1]; + while (top.kind != Symbol.Kind.TERMINAL) { + if (top.kind == Symbol.Kind.IMPLICIT_ACTION + || top.kind == Symbol.Kind.EXPLICIT_ACTION) { + skipHandler.skipAction(); + } else { + --pos; + pushProduction(null, top); + } + top = stack[pos - 1]; + } + skipHandler.skipTopSymbol(); + } + } + + /** + * Skips the repeater at the top the stack. + */ + public final void skipRepeater() throws IOException { + int target = pos; + Symbol repeater = stack[--pos]; + assert repeater.kind == Symbol.Kind.REPEATER; + pushProduction(null, repeater); + skipTo(target); + } + + /** + * Pushes the given symbol on to the skip and skips it. + * @param symToSkip The symbol that should be skipped. + */ + public final void skipSymbol(Symbol symToSkip) throws IOException { + int target = pos; + pushSymbol(symToSkip); + skipTo(target); + } +}