Return-Path: Delivered-To: apmail-hadoop-avro-commits-archive@minotaur.apache.org Received: (qmail 43488 invoked from network); 3 Sep 2009 21:20:19 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 3 Sep 2009 21:20:19 -0000 Received: (qmail 49004 invoked by uid 500); 3 Sep 2009 21:20:18 -0000 Delivered-To: apmail-hadoop-avro-commits-archive@hadoop.apache.org Received: (qmail 48980 invoked by uid 500); 3 Sep 2009 21:20:18 -0000 Mailing-List: contact avro-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: avro-dev@hadoop.apache.org Delivered-To: mailing list avro-commits@hadoop.apache.org Received: (qmail 48970 invoked by uid 99); 3 Sep 2009 21:20:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Sep 2009 21:20:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Sep 2009 21:20:11 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 904022388876; Thu, 3 Sep 2009 21:19:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r811128 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/ src/java/org/apache/avro/generic/ src/java/org/apache/avro/io/ src/java/org/apache/avro/specific/ src/test/java/org/apache/avro/ src/test/schemata/ Date: Thu, 03 Sep 2009 21:19:49 -0000 To: avro-commits@hadoop.apache.org From: cutting@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090903211951.904022388876@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cutting Date: Thu Sep 3 21:19:46 2009 New Revision: 811128 URL: http://svn.apache.org/viewvc?rev=811128&view=rev Log: AVRO-109. Add Java support for controlling sort order. Modified: hadoop/avro/trunk/CHANGES.txt hadoop/avro/trunk/src/java/org/apache/avro/Schema.java hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java hadoop/avro/trunk/src/test/schemata/simple.avpr Modified: hadoop/avro/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=811128&r1=811127&r2=811128&view=diff ============================================================================== --- hadoop/avro/trunk/CHANGES.txt (original) +++ hadoop/avro/trunk/CHANGES.txt Thu Sep 3 21:19:46 2009 @@ -42,6 +42,11 @@ Comparable. The implementation is consistent with the binary comparator added in AVRO-108. (cutting) + AVRO-109. Add Java support for controlling sort order via schema + annotations. Record fields now support an "order" attribute whose + possible values are "increasing" (the default), "decreasing", and + "ignore". (cutting) + IMPROVEMENTS AVRO-71. C++: make deserializer more generic. (Scott Banachowski Modified: hadoop/avro/trunk/src/java/org/apache/avro/Schema.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/Schema.java?rev=811128&r1=811127&r2=811128&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/Schema.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/Schema.java Thu Sep 3 21:19:46 2009 @@ -223,18 +223,33 @@ /** A field within a record. */ public static class Field { + + /** How values of this field should be ordered when sorting records. */ + public enum Order { + ASCENDING, DESCENDING, IGNORE; + private String name; + private Order() { this.name = this.name().toLowerCase(); } + }; + private int position = -1; private final Schema schema; private final JsonNode defaultValue; + private final Order order; + public Field(Schema schema, JsonNode defaultValue) { + this(schema, defaultValue, Order.ASCENDING); + } + public Field(Schema schema, JsonNode defaultValue, Order order) { this.schema = schema; this.defaultValue = defaultValue; + this.order = order; } /** The position of this field within the record. */ public int pos() { return position; } /** This field's {@link Schema}. */ public Schema schema() { return schema; } public JsonNode defaultValue() { return defaultValue; } + public Order order() { return order; } public boolean equals(Object other) { if (other == this) return true; if (!(other instanceof Field)) return false; @@ -245,6 +260,7 @@ ? that.defaultValue == null : (defaultValue.equals(that.defaultValue))); } + public int hashCode() { return schema.hashCode(); } } private static abstract class NamedSchema extends Schema { @@ -377,6 +393,8 @@ gen.writeFieldName("default"); gen.writeTree(entry.getValue().defaultValue()); } + if (entry.getValue().order() != Field.Order.ASCENDING) + gen.writeStringField("order", entry.getValue().order().name); gen.writeEndObject(); } gen.writeEndArray(); @@ -657,8 +675,12 @@ if (fieldTypeNode == null) throw new SchemaParseException("No field type: "+field); Schema fieldSchema = parse(fieldTypeNode, names); + Field.Order order = Field.Order.ASCENDING; + JsonNode orderNode = field.get("order"); + if (orderNode != null) + order = Field.Order.valueOf(orderNode.getTextValue().toUpperCase()); fields.put(fieldNameNode.getTextValue(), - new Field(fieldSchema, field.get("default"))); + new Field(fieldSchema, field.get("default"), order)); } result.setFields(fields); return result; Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java?rev=811128&r1=811127&r2=811128&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java Thu Sep 3 21:19:46 2009 @@ -389,10 +389,14 @@ case RECORD: GenericRecord r1 = (GenericRecord)o1; GenericRecord r2 = (GenericRecord)o2; - for (Map.Entry e : s.getFieldSchemas()) { - String field = e.getKey(); - int compare = compare(r1.get(field), r2.get(field), e.getValue()); - if (compare != 0) return compare; + for (Map.Entry e : s.getFields().entrySet()) { + Field f = e.getValue(); + if (f.order() == Field.Order.IGNORE) + continue; // ignore this field + String name = e.getKey(); + int compare = compare(r1.get(name), r2.get(name), f.schema()); + if (compare != 0) // not equal + return f.order() == Field.Order.DESCENDING ? -compare : compare; } return 0; case ENUM: Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java?rev=811128&r1=811127&r2=811128&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java Thu Sep 3 21:19:46 2009 @@ -18,17 +18,46 @@ package org.apache.avro.io; import java.util.Map; +import java.io.ByteArrayInputStream; +import java.io.IOException; import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; import org.apache.avro.AvroRuntimeException; +import org.apache.avro.generic.GenericDatumReader; /** Utilities for binary-encoded data. */ public class BinaryData { private BinaryData() {} // no public ctor - private static final int GT = -1; - private static final int LT = -2; + private static class Buffer extends ByteArrayInputStream { + public Buffer() { super(new byte[0]); } + public byte[] buf() { return buf; } + public int pos() { return pos; } + public void skip(int i) { this.pos += i; } + public void set(byte[] buf, int pos) { + this.buf = buf; + this.pos = pos; + this.count = buf.length; + } + } + + private static class Decoders { + private final Buffer b1, b2; + private final Decoder d1, d2; + public Decoders() { + this.b1 = new Buffer(); + this.b2 = new Buffer(); + this.d1 = new BinaryDecoder(b1); + this.d2 = new BinaryDecoder(b2); + } + } + + private static final ThreadLocal DECODERS + = new ThreadLocal() { + @Override protected Decoders initialValue() { return new Decoders(); } + }; /** Compare binary encoded data. If equal, return zero. If greater-than, * return 1, if less than return -1. Order is consistent with that of {@link @@ -36,103 +65,107 @@ public static int compare(byte[] b1, int s1, byte[] b2, int s2, Schema schema) { - int comp = comp(b1, s1, b2, s2, schema); // compare - return (comp >= 0) ? 0 : ((comp == GT) ? 1 : -1); // decode comparison + Decoders decoders = DECODERS.get(); + decoders.b1.set(b1, s1); + decoders.b2.set(b2, s2); + try { + return compare(decoders, schema); + } catch (IOException e) { + throw new AvroRuntimeException(e); + } } - + /** If equal, return the number of bytes consumed. If greater than, return * GT, if less than, return LT. */ - private static int comp(byte[] b1, int s1, byte[] b2, int s2, Schema schema) { + private static int compare(Decoders d, Schema schema) throws IOException { + Decoder d1 = d.d1; Decoder d2 = d.d2; switch (schema.getType()) { case RECORD: { - int size = 0; - for (Map.Entry entry : schema.getFieldSchemas()) { - int comp = comp(b1, s1+size, b2, s2+size, entry.getValue()); - if (comp < 0) return comp; - size += comp; + for (Map.Entry entry : schema.getFields().entrySet()) { + Field field = entry.getValue(); + if (field.order() == Field.Order.IGNORE) { + GenericDatumReader.skip(field.schema(), d1); + GenericDatumReader.skip(field.schema(), d2); + continue; + } + int c = compare(d, field.schema()); + if (c != 0) + return (field.order() != Field.Order.DESCENDING) ? c : -c; } - return size; + return 0; } case ENUM: case INT: case LONG: { - long l1 = readLong(b1, s1); - long l2 = readLong(b2, s2); - return (l1 == l2) ? longSize(l1) : ((l1 > l2) ? GT : LT); + long l1 = d1.readLong(); + long l2 = d2.readLong(); + return l1 == l2 ? 0 : (l1 > l2 ? 1 : -1); } case ARRAY: { long i = 0; // position in array long r1 = 0, r2 = 0; // remaining in current block long l1 = 0, l2 = 0; // total array length - int size1 = 0, size2 = 0; // total array size while (true) { if (r1 == 0) { // refill blocks(s) - r1 = readLong(b1, s1+size1); - size1 += longSize(r1); + r1 = d1.readLong(); + if (r1 < 0) { r1 = -r1; d1.readLong(); } l1 += r1; } if (r2 == 0) { - r2 = readLong(b2, s2+size2); - size2 += longSize(r2); + r2 = d2.readLong(); + if (r2 < 0) { r2 = -r2; d2.readLong(); } l2 += r2; } if (r1 == 0 || r2 == 0) // empty block: done - return (l1 == l2) ? size1 : ((l1 > l2) ? GT : LT); + return (l1 == l2) ? 0 : ((l1 > l2) ? 1 : -1); long l = Math.min(l1, l2); while (i < l) { // compare to end of block - int comp = comp(b1, s1+size1, b2, s2+size2, schema.getElementType()); - if (comp < 0) return comp; - size1 += comp; - size2 += comp; + int c = compare(d, schema.getElementType()); + if (c != 0) return c; i++; r1--; r2--; } } - } + } case MAP: throw new AvroRuntimeException("Can't compare maps!"); case UNION: { - int i1 = readInt(b1, s1); - int i2 = readInt(b2, s2); + int i1 = d1.readInt(); + int i2 = d2.readInt(); if (i1 == i2) { - int size = intSize(i1); - return comp(b1, s1+size, b2, s2+size, schema.getTypes().get(i1)); + return compare(d, schema.getTypes().get(i1)); } else { - return (i1 > i2) ? GT : LT; + return i1 - i2; } } case FIXED: { int size = schema.getFixedSize(); - int c = compareBytes(b1, s1, size, b2, s2, size); - return (c == 0) ? size : ((c > 0) ? GT : LT); + int c = compareBytes(d.b1.buf(), d.b1.pos(), size, + d.b2.buf(), d.b2.pos(), size); + d.b1.skip(size); + d.b2.skip(size); + return c; } case STRING: case BYTES: { - int l1 = readInt(b1, s1); - int l2 = readInt(b2, s2); - int size1 = intSize(l1); - int size2 = intSize(l2); - int c = compareBytes(b1, s1+size1, l1, b2, s2+size2, l2); - return (c == 0) ? size1+l1 : ((c > 0) ? GT : LT); + int l1 = d1.readInt(); + int l2 = d2.readInt(); + int c = compareBytes(d.b1.buf(), d.b1.pos(), l1, + d.b2.buf(), d.b2.pos(), l2); + d.b1.skip(l1); + d.b2.skip(l2); + return c; } case FLOAT: { - int n1 = 0, n2 = 0; - for (int i = 0, shift = 0; i < 4; i++, shift += 8) { - n1 |= (b1[s1+i] & 0xff) << shift; - n2 |= (b2[s2+i] & 0xff) << shift; - } - float f1 = Float.intBitsToFloat(n1); - float f2 = Float.intBitsToFloat(n2); - return (f1 == f2) ? 4 : ((f1 > f2) ? GT : LT); + float f1 = d1.readFloat(); + float f2 = d2.readFloat(); + return (f1 == f2) ? 0 : ((f1 > f2) ? 1 : -1); } case DOUBLE: { - long n1 = 0, n2 = 0; - for (int i = 0, shift = 0; i < 8; i++, shift += 8) { - n1 |= (b1[s1+i] & 0xffL) << shift; - n2 |= (b2[s2+i] & 0xffL) << shift; - } - double d1 = Double.longBitsToDouble(n1); - double d2 = Double.longBitsToDouble(n2); - return (d1 == d2) ? 8 : ((d1 > d2) ? GT : LT); + double f1 = d1.readDouble(); + double f2 = d2.readDouble(); + return (f1 == f2) ? 0 : ((f1 > f2) ? 1 : -1); } case BOOLEAN: - return b1[s1] == b2[s2] ? 1 : ((b1[s1] > b2[s2]) ? GT : LT); + boolean b1 = d1.readBoolean(); + boolean b2 = d2.readBoolean(); + return (b1 == b2) ? 0 : (b1 ? 1 : -1); case NULL: return 0; default: @@ -156,36 +189,4 @@ return l1 - l2; } - private static int readInt(byte[] b, int s) { - long l = readLong(b, s); - if (l < Integer.MIN_VALUE || Integer.MAX_VALUE < l) - throw new AvroRuntimeException("Integer overflow."); - return (int)l; - } - - - private static long readLong(byte[] buffer, int s) { - long n = 0; - for (int shift = 0; ; shift += 7) { - long b = buffer[s++]; - n |= (b & 0x7F) << shift; - if ((b & 0x80) == 0) { - break; - } - } - return (n >>> 1) ^ -(n & 1); // back to two's-complement - } - - private static int intSize(int i) { return longSize(i); } - - private static int longSize(long n) { - int size = 1; - n = (n << 1) ^ (n >> 63); // move sign to low-order bit - while ((n & ~0x7F) != 0) { - size++; - n >>>= 7; - } - return size; - } - } Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java?rev=811128&r1=811127&r2=811128&view=diff ============================================================================== --- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java (original) +++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java Thu Sep 3 21:19:46 2009 @@ -17,9 +17,10 @@ */ package org.apache.avro.specific; -import java.util.Map; +import java.util.Iterator; import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; import org.apache.avro.reflect.ReflectData; /** Utilities for generated Java classes and interfaces. */ @@ -48,11 +49,14 @@ case RECORD: SpecificRecord r1 = (SpecificRecord)o1; SpecificRecord r2 = (SpecificRecord)o2; - int i = 0; - for (Map.Entry e : s.getFieldSchemas()) { - int compare = compare(r1.get(i), r2.get(i), e.getValue()); - if (compare != 0) return compare; - i++; + Iterator fields = s.getFields().values().iterator(); + for (int i = 0; fields.hasNext(); i++) { + Field f = fields.next(); + if (f.order() == Field.Order.IGNORE) + continue; // ignore this field + int compare = compare(r1.get(i), r2.get(i), f.schema()); + if (compare != 0) // not equal + return f.order() == Field.Order.DESCENDING ? -compare : compare; } return 0; case ENUM: Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java?rev=811128&r1=811127&r2=811128&view=diff ============================================================================== --- hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java (original) +++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java Thu Sep 3 21:19:46 2009 @@ -111,15 +111,23 @@ @Test public void testRecord() throws Exception { - String recordJson = "{\"type\":\"record\", \"name\":\"Test\", \"fields\":" - +"[{\"name\":\"f\",\"type\":\"int\"},{\"name\":\"g\",\"type\":\"int\"}]}"; + String recordJson = "{\"type\":\"record\", \"name\":\"Test\", \"fields\":[" + +"{\"name\":\"f\",\"type\":\"int\",\"order\":\"ignore\"}," + +"{\"name\":\"g\",\"type\":\"int\",\"order\":\"descending\"}," + +"{\"name\":\"h\",\"type\":\"int\"}]}"; Schema schema = Schema.parse(recordJson); GenericData.Record r1 = new GenericData.Record(schema); - r1.put("f", 11); - r1.put("g", 12); + r1.put("f", 1); + r1.put("g", 13); + r1.put("h", 41); GenericData.Record r2 = new GenericData.Record(schema); - r2.put("f", 11); + r2.put("f", 0); + r2.put("g", 12); + r2.put("h", 41); + check(recordJson, r1, r2); + r2.put("f", 0); r2.put("g", 13); + r2.put("h", 42); check(recordJson, r1, r2); } @@ -148,16 +156,20 @@ Simple.TestRecord s1 = new Simple.TestRecord(); Simple.TestRecord s2 = new Simple.TestRecord(); s1.name = new Utf8("foo"); - s2.name = new Utf8("foo"); - s1.kind = Simple.Kind.BAR; - s2.kind = Simple.Kind.BAR; + s1.kind = Simple.Kind.BAZ; s1.hash = new Simple.MD5(); s1.hash.bytes(new byte[] {0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5}); + s2.name = new Utf8("bar"); + s2.kind = Simple.Kind.BAR; s2.hash = new Simple.MD5(); s2.hash.bytes(new byte[] {0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,6}); check(Simple.TestRecord._SCHEMA, s1, s2, true, new SpecificDatumWriter(Simple.TestRecord._SCHEMA), SpecificData.get()); + s2.kind = Simple.Kind.BAZ; + check(Simple.TestRecord._SCHEMA, s1, s2, true, + new SpecificDatumWriter(Simple.TestRecord._SCHEMA), + SpecificData.get()); } private static void check(String schemaJson, Object o1, Object o2) Modified: hadoop/avro/trunk/src/test/schemata/simple.avpr URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/schemata/simple.avpr?rev=811128&r1=811127&r2=811128&view=diff ============================================================================== --- hadoop/avro/trunk/src/test/schemata/simple.avpr (original) +++ hadoop/avro/trunk/src/test/schemata/simple.avpr Thu Sep 3 21:19:46 2009 @@ -8,8 +8,8 @@ {"name": "TestRecord", "type": "record", "fields": [ - {"name": "name", "type": "string"}, - {"name": "kind", "type": "Kind"}, + {"name": "name", "type": "string", "order": "ignore"}, + {"name": "kind", "type": "Kind", "order": "descending"}, {"name": "hash", "type": "MD5"} ] },