avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r811128 - in /hadoop/avro/trunk: ./ src/java/org/apache/avro/ src/java/org/apache/avro/generic/ src/java/org/apache/avro/io/ src/java/org/apache/avro/specific/ src/test/java/org/apache/avro/ src/test/schemata/
Date Thu, 03 Sep 2009 21:19:49 GMT
Author: cutting
Date: Thu Sep  3 21:19:46 2009
New Revision: 811128

URL: http://svn.apache.org/viewvc?rev=811128&view=rev
Log:
AVRO-109.  Add Java support for controlling sort order.

Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/java/org/apache/avro/Schema.java
    hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
    hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java
    hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java
    hadoop/avro/trunk/src/test/schemata/simple.avpr

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Sep  3 21:19:46 2009
@@ -42,6 +42,11 @@
     Comparable.  The implementation is consistent with the binary
     comparator added in AVRO-108. (cutting)
 
+    AVRO-109.  Add Java support for controlling sort order via schema
+    annotations.  Record fields now support an "order" attribute whose
+    possible values are "increasing" (the default), "decreasing", and
+    "ignore".  (cutting)
+
   IMPROVEMENTS
 
     AVRO-71.  C++: make deserializer more generic.  (Scott Banachowski

Modified: hadoop/avro/trunk/src/java/org/apache/avro/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/Schema.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/Schema.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/Schema.java Thu Sep  3 21:19:46 2009
@@ -223,18 +223,33 @@
 
   /** A field within a record. */
   public static class Field {
+
+    /** How values of this field should be ordered when sorting records. */
+    public enum Order {
+      ASCENDING, DESCENDING, IGNORE;
+      private String name;
+      private Order() { this.name = this.name().toLowerCase(); }
+    };
+
     private int position = -1;
     private final Schema schema;
     private final JsonNode defaultValue;
+    private final Order order;
+
     public Field(Schema schema, JsonNode defaultValue) {
+      this(schema, defaultValue, Order.ASCENDING);
+    }
+    public Field(Schema schema, JsonNode defaultValue, Order order) {
       this.schema = schema;
       this.defaultValue = defaultValue;
+      this.order = order;
     }
     /** The position of this field within the record. */
     public int pos() { return position; }
     /** This field's {@link Schema}. */
     public Schema schema() { return schema; }
     public JsonNode defaultValue() { return defaultValue; }
+    public Order order() { return order; }
     public boolean equals(Object other) {
       if (other == this) return true;
       if (!(other instanceof Field)) return false;
@@ -245,6 +260,7 @@
          ? that.defaultValue == null
          : (defaultValue.equals(that.defaultValue)));
     }
+    public int hashCode() { return schema.hashCode(); }
   }
 
   private static abstract class NamedSchema extends Schema {
@@ -377,6 +393,8 @@
           gen.writeFieldName("default");
           gen.writeTree(entry.getValue().defaultValue());
         }
+        if (entry.getValue().order() != Field.Order.ASCENDING)
+          gen.writeStringField("order", entry.getValue().order().name);
         gen.writeEndObject();
       }
       gen.writeEndArray();
@@ -657,8 +675,12 @@
           if (fieldTypeNode == null)
             throw new SchemaParseException("No field type: "+field);
           Schema fieldSchema = parse(fieldTypeNode, names);
+          Field.Order order = Field.Order.ASCENDING;
+          JsonNode orderNode = field.get("order");
+          if (orderNode != null)
+            order = Field.Order.valueOf(orderNode.getTextValue().toUpperCase());
           fields.put(fieldNameNode.getTextValue(),
-                     new Field(fieldSchema, field.get("default")));
+                     new Field(fieldSchema, field.get("default"), order));
         }
         result.setFields(fields);
         return result;

Modified: hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/generic/GenericData.java Thu Sep  3 21:19:46
2009
@@ -389,10 +389,14 @@
     case RECORD:
       GenericRecord r1 = (GenericRecord)o1;
       GenericRecord r2 = (GenericRecord)o2;
-      for (Map.Entry<String, Schema> e : s.getFieldSchemas()) {
-        String field = e.getKey();
-        int compare = compare(r1.get(field), r2.get(field), e.getValue());
-        if (compare != 0) return compare;
+      for (Map.Entry<String, Field> e : s.getFields().entrySet()) {
+        Field f = e.getValue();
+        if (f.order() == Field.Order.IGNORE)
+          continue;                               // ignore this field
+        String name = e.getKey();
+        int compare = compare(r1.get(name), r2.get(name), f.schema());
+        if (compare != 0)                         // not equal
+          return f.order() == Field.Order.DESCENDING ? -compare : compare;
       }
       return 0;
     case ENUM:

Modified: hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/io/BinaryData.java Thu Sep  3 21:19:46 2009
@@ -18,17 +18,46 @@
 package org.apache.avro.io;
 
 import java.util.Map;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
 
 import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
 import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.generic.GenericDatumReader;
 
 /** Utilities for binary-encoded data. */
 public class BinaryData {
 
   private BinaryData() {}                      // no public ctor
 
-  private static final int GT = -1;
-  private static final int LT = -2;
+  private static class Buffer extends ByteArrayInputStream {
+    public Buffer() { super(new byte[0]); }
+    public byte[] buf() { return buf; }
+    public int pos() { return pos; }
+    public void skip(int i) { this.pos += i; }
+    public void set(byte[] buf, int pos) {
+      this.buf = buf;
+      this.pos = pos;
+      this.count = buf.length;
+    }
+  }
+
+  private static class Decoders {
+    private final Buffer b1, b2;
+    private final Decoder d1, d2;
+    public Decoders() {
+      this.b1 = new Buffer();
+      this.b2 = new Buffer();
+      this.d1 = new BinaryDecoder(b1);
+      this.d2 = new BinaryDecoder(b2);
+    }
+  }
+
+  private static final ThreadLocal<Decoders> DECODERS
+    = new ThreadLocal<Decoders>() {
+    @Override protected Decoders initialValue() { return new Decoders(); }
+  };
 
   /** Compare binary encoded data.  If equal, return zero.  If greater-than,
    * return 1, if less than return -1. Order is consistent with that of {@link
@@ -36,103 +65,107 @@
   public static int compare(byte[] b1, int s1,
                             byte[] b2, int s2,
                             Schema schema) {
-    int comp = comp(b1, s1, b2, s2, schema);          // compare
-    return (comp >= 0) ? 0 : ((comp == GT) ? 1 : -1); // decode comparison
+    Decoders decoders = DECODERS.get();
+    decoders.b1.set(b1, s1);
+    decoders.b2.set(b2, s2);
+    try {
+      return compare(decoders, schema);
+    } catch (IOException e) {
+      throw new AvroRuntimeException(e);
+    }
   }
-   
+
   /** If equal, return the number of bytes consumed.  If greater than, return
    * GT, if less than, return LT. */
-  private static int comp(byte[] b1, int s1, byte[] b2, int s2, Schema schema) {
+  private static int compare(Decoders d, Schema schema) throws IOException {
+    Decoder d1 = d.d1; Decoder d2 = d.d2;
     switch (schema.getType()) {
     case RECORD: {
-      int size = 0;
-      for (Map.Entry<String, Schema> entry : schema.getFieldSchemas()) {
-        int comp = comp(b1, s1+size, b2, s2+size, entry.getValue());
-        if (comp < 0) return comp;
-        size += comp;
+      for (Map.Entry<String, Field> entry : schema.getFields().entrySet()) {
+        Field field = entry.getValue();
+        if (field.order() == Field.Order.IGNORE) {
+          GenericDatumReader.skip(field.schema(), d1);
+          GenericDatumReader.skip(field.schema(), d2);
+          continue;
+        }
+        int c = compare(d, field.schema());
+        if (c != 0)
+          return (field.order() != Field.Order.DESCENDING) ? c : -c;
       }
-      return size;
+      return 0;
     }
     case ENUM: case INT: case LONG: {
-      long l1 = readLong(b1, s1);
-      long l2 = readLong(b2, s2);
-      return (l1 == l2) ? longSize(l1) : ((l1 > l2) ? GT : LT);
+      long l1 = d1.readLong();
+      long l2 = d2.readLong();
+      return l1 == l2 ? 0 : (l1 > l2 ? 1 : -1);
     }
     case ARRAY: {
       long i = 0;                                 // position in array
       long r1 = 0, r2 = 0;                        // remaining in current block
       long l1 = 0, l2 = 0;                        // total array length
-      int size1 = 0, size2 = 0;                   // total array size
       while (true) {
         if (r1 == 0) {                            // refill blocks(s)
-          r1 = readLong(b1, s1+size1);
-          size1 += longSize(r1);
+          r1 = d1.readLong();
+          if (r1 < 0) { r1 = -r1; d1.readLong(); }
           l1 += r1;
         }
         if (r2 == 0) {
-          r2 = readLong(b2, s2+size2);
-          size2 += longSize(r2);
+          r2 = d2.readLong();
+          if (r2 < 0) { r2 = -r2; d2.readLong(); }
           l2 += r2;
         }
         if (r1 == 0 || r2 == 0)                   // empty block: done
-          return (l1 == l2) ? size1 : ((l1 > l2) ? GT : LT);
+          return (l1 == l2) ? 0 : ((l1 > l2) ? 1 : -1);
         long l = Math.min(l1, l2);
         while (i < l) {                           // compare to end of block
-          int comp = comp(b1, s1+size1, b2, s2+size2, schema.getElementType());
-          if (comp < 0) return comp;
-          size1 += comp;
-          size2 += comp;
+          int c = compare(d, schema.getElementType());
+          if (c != 0) return c;
           i++; r1--; r2--;
         }
       }
-      }
+    }
     case MAP:
       throw new AvroRuntimeException("Can't compare maps!");
     case UNION: {
-      int i1 = readInt(b1, s1);
-      int i2 = readInt(b2, s2);
+      int i1 = d1.readInt();
+      int i2 = d2.readInt();
       if (i1 == i2) {
-        int size = intSize(i1);
-        return comp(b1, s1+size, b2, s2+size, schema.getTypes().get(i1));
+        return compare(d, schema.getTypes().get(i1));
       } else {
-        return (i1 > i2) ? GT : LT;
+        return i1 - i2;
       }
     }
     case FIXED: {
       int size = schema.getFixedSize();
-      int c = compareBytes(b1, s1, size, b2, s2, size);
-      return (c == 0) ? size : ((c > 0) ? GT : LT);
+      int c = compareBytes(d.b1.buf(), d.b1.pos(), size,
+                           d.b2.buf(), d.b2.pos(), size);
+      d.b1.skip(size);
+      d.b2.skip(size);
+      return c;
     }
     case STRING: case BYTES: {
-      int l1 = readInt(b1, s1);
-      int l2 = readInt(b2, s2);
-      int size1 = intSize(l1);
-      int size2 = intSize(l2);
-      int c = compareBytes(b1, s1+size1, l1, b2, s2+size2, l2);
-      return (c == 0) ? size1+l1 : ((c > 0) ? GT : LT);
+      int l1 = d1.readInt();
+      int l2 = d2.readInt();
+      int c = compareBytes(d.b1.buf(), d.b1.pos(), l1,
+                           d.b2.buf(), d.b2.pos(), l2);
+      d.b1.skip(l1);
+      d.b2.skip(l2);
+      return c;
     }
     case FLOAT: {
-      int n1 = 0, n2 = 0;
-      for (int i = 0, shift = 0; i < 4; i++, shift += 8) {
-        n1 |= (b1[s1+i] & 0xff) << shift;
-        n2 |= (b2[s2+i] & 0xff) << shift;
-      }
-      float f1 = Float.intBitsToFloat(n1);
-      float f2 = Float.intBitsToFloat(n2);
-      return (f1 == f2) ? 4 : ((f1 > f2) ? GT : LT);
+      float f1 = d1.readFloat();
+      float f2 = d2.readFloat();
+      return (f1 == f2) ? 0 : ((f1 > f2) ? 1 : -1);
     }
     case DOUBLE: {
-      long n1 = 0, n2 = 0;
-      for (int i = 0, shift = 0; i < 8; i++, shift += 8) {
-        n1 |= (b1[s1+i] & 0xffL) << shift;
-        n2 |= (b2[s2+i] & 0xffL) << shift;
-      }
-      double d1 = Double.longBitsToDouble(n1);
-      double d2 = Double.longBitsToDouble(n2);
-      return (d1 == d2) ? 8 : ((d1 > d2) ? GT : LT);
+      double f1 = d1.readDouble();
+      double f2 = d2.readDouble();
+      return (f1 == f2) ? 0 : ((f1 > f2) ? 1 : -1);
     }
     case BOOLEAN:
-      return b1[s1] == b2[s2] ? 1 : ((b1[s1] > b2[s2]) ? GT : LT);
+      boolean b1 = d1.readBoolean();
+      boolean b2 = d2.readBoolean();
+      return (b1 == b2) ? 0 : (b1 ? 1 : -1);
     case NULL:
       return 0;
     default:
@@ -156,36 +189,4 @@
     return l1 - l2;
   }
 
-  private static int readInt(byte[] b, int s) {
-    long l = readLong(b, s);
-    if (l < Integer.MIN_VALUE || Integer.MAX_VALUE < l)
-      throw new AvroRuntimeException("Integer overflow.");
-    return (int)l;
-  }
-
-
-  private static long readLong(byte[] buffer, int s) {
-    long n = 0;
-    for (int shift = 0; ; shift += 7) {
-      long b = buffer[s++];
-      n |= (b & 0x7F) << shift;
-      if ((b & 0x80) == 0) {
-        break;
-      }
-    }
-    return (n >>> 1) ^ -(n & 1);                  // back to two's-complement
-  }
-
-  private static int intSize(int i) { return longSize(i); }
-
-  private static int longSize(long n) {
-    int size = 1;
-    n = (n << 1) ^ (n >> 63);                     // move sign to low-order bit
-    while ((n & ~0x7F) != 0) {
-      size++;
-      n >>>= 7;
-    }
-    return size;
-  }
-
 }

Modified: hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/specific/SpecificData.java Thu Sep  3 21:19:46
2009
@@ -17,9 +17,10 @@
  */
 package org.apache.avro.specific;
 
-import java.util.Map;
+import java.util.Iterator;
 
 import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
 import org.apache.avro.reflect.ReflectData;
 
 /** Utilities for generated Java classes and interfaces. */
@@ -48,11 +49,14 @@
     case RECORD:
       SpecificRecord r1 = (SpecificRecord)o1;
       SpecificRecord r2 = (SpecificRecord)o2;
-      int i = 0;
-      for (Map.Entry<String, Schema> e : s.getFieldSchemas()) {
-        int compare = compare(r1.get(i), r2.get(i), e.getValue());
-        if (compare != 0) return compare;
-        i++;
+      Iterator<Field> fields = s.getFields().values().iterator();
+      for (int i = 0; fields.hasNext(); i++) {
+        Field f = fields.next();
+        if (f.order() == Field.Order.IGNORE)
+          continue;                               // ignore this field
+        int compare = compare(r1.get(i), r2.get(i), f.schema());
+        if (compare != 0)                         // not equal
+          return f.order() == Field.Order.DESCENDING ? -compare : compare;
       }
       return 0;
     case ENUM:

Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestCompare.java Thu Sep  3 21:19:46 2009
@@ -111,15 +111,23 @@
 
   @Test
   public void testRecord() throws Exception {
-    String recordJson = "{\"type\":\"record\", \"name\":\"Test\", \"fields\":"
-      +"[{\"name\":\"f\",\"type\":\"int\"},{\"name\":\"g\",\"type\":\"int\"}]}";
+    String recordJson = "{\"type\":\"record\", \"name\":\"Test\", \"fields\":["
+      +"{\"name\":\"f\",\"type\":\"int\",\"order\":\"ignore\"},"
+      +"{\"name\":\"g\",\"type\":\"int\",\"order\":\"descending\"},"
+      +"{\"name\":\"h\",\"type\":\"int\"}]}";
     Schema schema = Schema.parse(recordJson);
     GenericData.Record r1 = new GenericData.Record(schema);
-    r1.put("f", 11);
-    r1.put("g", 12);
+    r1.put("f", 1);
+    r1.put("g", 13);
+    r1.put("h", 41);
     GenericData.Record r2 = new GenericData.Record(schema);
-    r2.put("f", 11);
+    r2.put("f", 0);
+    r2.put("g", 12);
+    r2.put("h", 41);
+    check(recordJson, r1, r2);
+    r2.put("f", 0);
     r2.put("g", 13);
+    r2.put("h", 42);
     check(recordJson, r1, r2);
   }
 
@@ -148,16 +156,20 @@
     Simple.TestRecord s1 = new Simple.TestRecord();
     Simple.TestRecord s2 = new Simple.TestRecord();
     s1.name = new Utf8("foo");
-    s2.name = new Utf8("foo");
-    s1.kind = Simple.Kind.BAR;
-    s2.kind = Simple.Kind.BAR;
+    s1.kind = Simple.Kind.BAZ;
     s1.hash = new Simple.MD5();
     s1.hash.bytes(new byte[] {0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,5});
+    s2.name = new Utf8("bar");
+    s2.kind = Simple.Kind.BAR;
     s2.hash = new Simple.MD5();
     s2.hash.bytes(new byte[] {0,1,2,3,4,5,6,7,8,9,0,1,2,3,4,6});
     check(Simple.TestRecord._SCHEMA, s1, s2, true,
           new SpecificDatumWriter(Simple.TestRecord._SCHEMA),
           SpecificData.get());
+    s2.kind = Simple.Kind.BAZ;
+    check(Simple.TestRecord._SCHEMA, s1, s2, true,
+          new SpecificDatumWriter(Simple.TestRecord._SCHEMA),
+          SpecificData.get());
   }  
 
   private static void check(String schemaJson, Object o1, Object o2)

Modified: hadoop/avro/trunk/src/test/schemata/simple.avpr
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/schemata/simple.avpr?rev=811128&r1=811127&r2=811128&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/schemata/simple.avpr (original)
+++ hadoop/avro/trunk/src/test/schemata/simple.avpr Thu Sep  3 21:19:46 2009
@@ -8,8 +8,8 @@
 
      {"name": "TestRecord", "type": "record",
       "fields": [
-          {"name": "name", "type": "string"},
-          {"name": "kind", "type": "Kind"},
+          {"name": "name", "type": "string", "order": "ignore"},
+          {"name": "kind", "type": "Kind", "order": "descending"},
           {"name": "hash", "type": "MD5"}
       ]
      },



Mime
View raw message