avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1501579 - in /avro/trunk: ./ lang/java/avro/src/main/java/org/apache/avro/generic/ lang/java/avro/src/main/java/org/apache/avro/reflect/ lang/java/avro/src/main/java/org/apache/avro/specific/ lang/java/mapred/src/main/java/org/apache/avro/...
Date Tue, 09 Jul 2013 21:57:44 GMT
Author: cutting
Date: Tue Jul  9 21:57:43 2013
New Revision: 1501579

URL: http://svn.apache.org/r1501579
Log:
AVRO-1351. Extend SortedKeyValueFile to support data models besides generic.

Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
    avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
    avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java
    avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
    avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
    avro/trunk/lang/java/thrift/src/main/java/org/apache/avro/thrift/ThriftData.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1501579&r1=1501578&r2=1501579&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Jul  9 21:57:43 2013
@@ -67,6 +67,9 @@ Trunk (not yet released)
     AVRO-1290. Handling NaN and positive and negative infinities in
     C++ Json (Daniel Russel via thiru)
 
+    AVRO-1351. Extend SortedKeyValueFile to support data models
+    besides generic. (cutting)
+
   BUG FIXES
 
     AVRO-1296. Python: Fix schemas retrieved from protocol types

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java?rev=1501579&r1=1501578&r2=1501579&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java Tue Jul
 9 21:57:43 2013
@@ -42,6 +42,7 @@ import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.parsing.ResolvingGrammarGenerator;
 import org.apache.avro.util.Utf8;
 
@@ -338,6 +339,11 @@ public class GenericData {
     return new GenericDatumReader(schema, schema, this);
   }
 
+  /** Returns a {@link DatumWriter} for this kind of data. */
+  public DatumWriter createDatumWriter(Schema schema) {
+    return new GenericDatumWriter(schema, this);
+  }
+
   /** Returns true if a Java datum matches a schema. */
   public boolean validate(Schema schema, Object datum) {
     switch (schema.getType()) {
@@ -929,13 +935,17 @@ public class GenericData {
       case NULL:
         return null;
       case RECORD:
-        IndexedRecord recordValue = (IndexedRecord) value;
-        IndexedRecord recordCopy = (IndexedRecord) newRecord(null, schema);
-        for (Field field : schema.getFields()) {
-          recordCopy.put(field.pos(), 
-              deepCopy(field.schema(), recordValue.get(field.pos())));
+        Object oldState = getRecordState(value, schema);
+        Object newRecord = newRecord(null, schema);
+        Object newState = getRecordState(newRecord, schema);
+        for (Field f : schema.getFields()) {
+          int pos = f.pos();
+          String name = f.name();
+          Object newValue = deepCopy(f.schema(),
+                                     getField(value, name, pos, oldState));
+          setField(newRecord, name, pos, newValue, newState);
         }
-        return (T)recordCopy;
+        return (T)newRecord;
       case STRING:
         // Strings are immutable
         if (value instanceof String) {

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1501579&r1=1501578&r2=1501579&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java Tue Jul
 9 21:57:43 2013
@@ -48,6 +48,7 @@ import org.apache.avro.generic.GenericFi
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.avro.io.BinaryData;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
 import org.apache.avro.specific.FixedSize;
 import org.apache.avro.specific.SpecificData;
 import org.codehaus.jackson.JsonNode;
@@ -100,6 +101,11 @@ public class ReflectData extends Specifi
   }
 
   @Override
+  public DatumWriter createDatumWriter(Schema schema) {
+    return new ReflectDatumWriter(schema, this);
+  }
+
+  @Override
   public void setField(Object record, String name, int position, Object o) {
     setField(record, name, position, o, null);
   }

Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java?rev=1501579&r1=1501578&r2=1501579&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java (original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/specific/SpecificData.java Tue
Jul  9 21:57:43 2013
@@ -36,6 +36,7 @@ import org.apache.avro.AvroTypeException
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
 
 /** Utilities for generated Java classes and interfaces. */
 public class SpecificData extends GenericData {
@@ -84,6 +85,11 @@ public class SpecificData extends Generi
     return new SpecificDatumReader(schema, schema, this);
   }
 
+  @Override
+  public DatumWriter createDatumWriter(Schema schema) {
+    return new SpecificDatumWriter(schema, this);
+  }
+
   /** Return the singleton instance. */
   public static SpecificData get() { return INSTANCE; }
 

Modified: avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java?rev=1501579&r1=1501578&r2=1501579&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java
(original)
+++ avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/hadoop/file/SortedKeyValueFile.java
Tue Jul  9 21:57:43 2013
@@ -28,9 +28,8 @@ import java.util.TreeMap;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.file.DataFileReader;
@@ -89,6 +88,9 @@ public class SortedKeyValueFile {
     /** The key schema for the data file. */
     private final Schema mKeySchema;
 
+    /** The model for the data. */
+    private GenericData model;
+
     /** A class to encapsulate the options of a Reader. */
     public static class Options {
       /** The configuration. */
@@ -103,6 +105,9 @@ public class SortedKeyValueFile {
       /** The reader schema for the value. */
       private Schema mValueSchema;
 
+      /** The model for the data. */
+      private GenericData model = SpecificData.get();
+
       /**
        * Sets the configuration.
        *
@@ -182,6 +187,18 @@ public class SortedKeyValueFile {
       public Schema getValueSchema() {
         return mValueSchema;
       }
+
+      /** Set the data model. */
+      public Options withDataModel(GenericData model) {
+        this.model = model;
+        return this;
+      }
+
+      /** Return the data model. */
+      public GenericData getDataModel() {
+        return model;
+      }
+
     }
 
     /**
@@ -192,6 +209,7 @@ public class SortedKeyValueFile {
      */
     public Reader(Options options) throws IOException {
       mKeySchema = options.getKeySchema();
+      this.model = options.getDataModel();
 
       // Load the whole index file into memory.
       Path indexFilePath = new Path(options.getPath(), INDEX_FILENAME);
@@ -202,10 +220,12 @@ public class SortedKeyValueFile {
       Path dataFilePath = new Path(options.getPath(), DATA_FILENAME);
       LOG.debug("Loading the data file " + dataFilePath);
       Schema recordSchema = AvroKeyValue.getSchema(mKeySchema, options.getValueSchema());
-      DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(recordSchema);
+      DatumReader<GenericRecord> datumReader =
+        model.createDatumReader(recordSchema);
       mDataFileReader =
         new DataFileReader<GenericRecord>
         (new FsInput(dataFilePath, options.getConfiguration()), datumReader);
+      
     }
 
     /**
@@ -235,7 +255,7 @@ public class SortedKeyValueFile {
       Iterator<AvroKeyValue<K, V>> iter = iterator();
       while (iter.hasNext()) {
         AvroKeyValue<K, V> record = iter.next();
-        int comparison = GenericData.get().compare(record.getKey(), key, mKeySchema);
+        int comparison = model.compare(record.getKey(), key, mKeySchema);
         if (0 == comparison) {
           // We've found it!
           LOG.debug("Found record for key " + key);
@@ -281,9 +301,9 @@ public class SortedKeyValueFile {
      * @param keySchema The reader schema for the key.
      * @throws IOException If there is an error.
      */
-    private static <K> NavigableMap<K, Long> loadIndexFile(
+    private <K> NavigableMap<K, Long> loadIndexFile(
         Configuration conf, Path path, Schema keySchema) throws IOException {
-      DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(
+      DatumReader<GenericRecord> datumReader = model.createDatumReader(
           AvroKeyValue.getSchema(keySchema, Schema.create(Schema.Type.LONG)));
       DataFileReader<GenericRecord> fileReader = new DataFileReader<GenericRecord>(
           new FsInput(path, conf), datumReader);
@@ -331,6 +351,9 @@ public class SortedKeyValueFile {
     /** The schema of the index file records. */
     private final Schema mIndexSchema;
 
+    /** The model for the data. */
+    private GenericData model;
+
     /** The writer for the data file. */
     private final DataFileWriter<GenericRecord> mDataFileWriter;
 
@@ -363,6 +386,9 @@ public class SortedKeyValueFile {
       /** The number of records between indexed entries. */
       private int mIndexInterval = 128;
 
+      /** The model for the data. */
+      private GenericData model = SpecificData.get();
+
       /**
        * Sets the key schema.
        *
@@ -465,6 +491,17 @@ public class SortedKeyValueFile {
       public int getIndexInterval() {
         return mIndexInterval;
       }
+
+      /** Set the data model. */
+      public Options withDataModel(GenericData model) {
+        this.model = model;
+        return this;
+      }
+
+      /** Return the data model. */
+      public GenericData getDataModel() {
+        return model;
+      }
     }
 
     /**
@@ -474,6 +511,8 @@ public class SortedKeyValueFile {
      * @throws IOException If there is an error.
      */
     public Writer(Options options) throws IOException {
+      this.model = options.getDataModel();
+
       if (null == options.getConfiguration()) {
         throw new IllegalArgumentException("Configuration may not be null");
       }
@@ -504,8 +543,8 @@ public class SortedKeyValueFile {
       Path dataFilePath = new Path(options.getPath(), DATA_FILENAME);
       LOG.debug("Creating writer for avro data file: " + dataFilePath);
       mRecordSchema = AvroKeyValue.getSchema(mKeySchema, mValueSchema);
-      DatumWriter<GenericRecord> datumWriter
-          = new GenericDatumWriter<GenericRecord>(mRecordSchema);
+      DatumWriter<GenericRecord> datumWriter =
+        model.createDatumWriter(mRecordSchema);
       OutputStream dataOutputStream = fileSystem.create(dataFilePath);
       mDataFileWriter = new DataFileWriter<GenericRecord>(datumWriter)
           .setSyncInterval(1 << 20)  // Set the auto-sync interval sufficiently large,
since
@@ -516,8 +555,8 @@ public class SortedKeyValueFile {
       Path indexFilePath = new Path(options.getPath(), INDEX_FILENAME);
       LOG.debug("Creating writer for avro index file: " + indexFilePath);
       mIndexSchema = AvroKeyValue.getSchema(mKeySchema, Schema.create(Schema.Type.LONG));
-      DatumWriter<GenericRecord> indexWriter
-          = new GenericDatumWriter<GenericRecord>(mIndexSchema);
+      DatumWriter<GenericRecord> indexWriter =
+        model.createDatumWriter(mIndexSchema);
       OutputStream indexOutputStream = fileSystem.create(indexFilePath);
       mIndexFileWriter = new DataFileWriter<GenericRecord>(indexWriter)
           .create(mIndexSchema, indexOutputStream);
@@ -532,11 +571,11 @@ public class SortedKeyValueFile {
      */
     public void append(K key, V value) throws IOException {
       // Make sure the keys are inserted in sorted order.
-      if (null != mPreviousKey && GenericData.get().compare(key, mPreviousKey, mKeySchema)
< 0) {
+      if (null != mPreviousKey && model.compare(key, mPreviousKey, mKeySchema) <
0) {
         throw new IllegalArgumentException("Records must be inserted in sorted key order."
             + " Attempted to insert key " + key + " after " + mPreviousKey + ".");
       }
-      mPreviousKey = GenericData.get().deepCopy(mKeySchema, key);
+      mPreviousKey = model.deepCopy(mKeySchema, key);
 
       // Construct the data record.
       AvroKeyValue<K, V> dataRecord

Modified: avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java?rev=1501579&r1=1501578&r2=1501579&view=diff
==============================================================================
--- avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
(original)
+++ avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/hadoop/file/TestSortedKeyValueFile.java
Tue Jul  9 21:57:43 2013
@@ -28,6 +28,7 @@ import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.hadoop.io.AvroKeyValue;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.file.FileReader;
@@ -198,4 +199,71 @@ public class TestSortedKeyValueFile {
       reader.close();
     }
   }
+
+  public static class Stringy implements Comparable<Stringy> {
+    private String s;
+    public Stringy() {};
+    public Stringy(String s) { this.s = s; }
+    @Override public String toString() { return s; }
+    @Override public int hashCode() { return s.hashCode(); }
+    @Override public boolean equals(Object that) {
+      return this.s.equals(that.toString());
+    }
+    @Override public int compareTo(Stringy that) {
+      return this.s.compareTo(that.s);
+    }
+  }
+
+  @Test public void testAlternateModel() throws Exception {
+    LOG.debug("Writing some reflect records...");
+
+    ReflectData model = ReflectData.get();
+
+    Configuration conf = new Configuration();
+    SortedKeyValueFile.Writer.Options options
+      = new SortedKeyValueFile.Writer.Options()
+      .withKeySchema(model.getSchema(Stringy.class))
+      .withValueSchema(model.getSchema(Stringy.class))
+      .withConfiguration(conf)
+      .withPath(new Path(mTempDir.getRoot().getPath(), "reflect"))
+      .withDataModel(model)
+      .withIndexInterval(2);
+
+    SortedKeyValueFile.Writer<Stringy,Stringy> writer
+        = new SortedKeyValueFile.Writer<Stringy,Stringy>(options);
+
+    try {
+      writer.append(new Stringy("apple"), new Stringy("Apple"));
+      writer.append(new Stringy("banana"), new Stringy("Banana"));
+      writer.append(new Stringy("carrot"), new Stringy("Carrot"));
+      writer.append(new Stringy("durian"), new Stringy("Durian"));
+    } finally {
+      writer.close();
+    }
+
+    LOG.debug("Reading the file back using a reader...");
+    SortedKeyValueFile.Reader.Options readerOptions =
+      new SortedKeyValueFile.Reader.Options()
+      .withKeySchema(model.getSchema(Stringy.class))
+      .withValueSchema(model.getSchema(Stringy.class))
+      .withConfiguration(conf)
+      .withPath(new Path(mTempDir.getRoot().getPath(), "reflect"))
+      .withDataModel(model);
+
+    SortedKeyValueFile.Reader<Stringy,Stringy> reader
+      = new SortedKeyValueFile.Reader<Stringy,Stringy>(readerOptions);
+
+    try {
+      assertEquals(new Stringy("Carrot"), reader.get(new Stringy("carrot")));
+      assertEquals(new Stringy("Banana"), reader.get(new Stringy("banana")));
+      assertNull(reader.get(new Stringy("a-vegetable")));
+      assertNull(reader.get(new Stringy("beet")));
+      assertNull(reader.get(new Stringy("zzz")));
+    } finally {
+      reader.close();
+    }
+
+  }
+
+
 }

Modified: avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java?rev=1501579&r1=1501578&r2=1501579&view=diff
==============================================================================
--- avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
(original)
+++ avro/trunk/lang/java/protobuf/src/main/java/org/apache/avro/protobuf/ProtobufData.java
Tue Jul  9 21:57:43 2013
@@ -32,6 +32,7 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -66,6 +67,11 @@ public class ProtobufData extends Generi
   }
 
   @Override
+  public DatumWriter createDatumWriter(Schema schema) {
+    return new ProtobufDatumWriter(schema, this);
+  }
+
+  @Override
   public void setField(Object r, String n, int pos, Object o) {
     setField(r, n, pos, o, getRecordState(r, getSchema(r.getClass())));
   }

Modified: avro/trunk/lang/java/thrift/src/main/java/org/apache/avro/thrift/ThriftData.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/thrift/src/main/java/org/apache/avro/thrift/ThriftData.java?rev=1501579&r1=1501578&r2=1501579&view=diff
==============================================================================
--- avro/trunk/lang/java/thrift/src/main/java/org/apache/avro/thrift/ThriftData.java (original)
+++ avro/trunk/lang/java/thrift/src/main/java/org/apache/avro/thrift/ThriftData.java Tue Jul
 9 21:57:43 2013
@@ -30,6 +30,7 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
 
 import org.apache.thrift.TBase;
 import org.apache.thrift.TEnum;
@@ -63,6 +64,11 @@ public class ThriftData extends GenericD
   }
 
   @Override
+  public DatumWriter createDatumWriter(Schema schema) {
+    return new ThriftDatumWriter(schema, this);
+  }
+
+  @Override
   public void setField(Object r, String n, int pos, Object o) {
     setField(r, n, pos, o, getRecordState(r, getSchema(r.getClass())));
   }



Mime
View raw message