parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject [1/2] parquet-mr git commit: PARQUET-358: Add support for Avro's logical types API.
Date Wed, 20 Apr 2016 15:41:29 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master 82b8ecc32 -> 6b24a1d1b


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java
new file mode 100644
index 0000000..d2f80ed
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestCircularReferences.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.parquet.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This class is based on org.apache.avro.TestCircularReferences
+ *
+ * The main difference between this class and the Avro version is that this one
+ * uses a place-holder schema for the circular reference from Child to Parent.
+ * This avoids creating a schema for Parent that references itself and can't be
+ * converted to a Parquet schema. The place-holder schema must also have a
+ * referenceable logical type.
+ */
+public class TestCircularReferences {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static class Reference extends LogicalType {
+    private static final String REFERENCE = "reference";
+    private static final String REF_FIELD_NAME = "ref-field-name";
+
+    private final String refFieldName;
+
+    public Reference(String refFieldName) {
+      super(REFERENCE);
+      this.refFieldName = refFieldName;
+    }
+
+    public Reference(Schema schema) {
+      super(REFERENCE);
+      this.refFieldName = schema.getProp(REF_FIELD_NAME);
+    }
+
+    @Override
+    public Schema addToSchema(Schema schema) {
+      super.addToSchema(schema);
+      schema.addProp(REF_FIELD_NAME, refFieldName);
+      return schema;
+    }
+
+    @Override
+    public String getName() {
+      return REFERENCE;
+    }
+
+    public String getRefFieldName() {
+      return refFieldName;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      if (schema.getField(refFieldName) == null) {
+        throw new IllegalArgumentException("Invalid field name for reference field: " + refFieldName);
+      }
+    }
+  }
+
+  public static class Referenceable extends LogicalType {
+    private static final String REFERENCEABLE = "referenceable";
+    private static final String ID_FIELD_NAME = "id-field-name";
+
+    private final String idFieldName;
+
+    public Referenceable(String idFieldName) {
+      super(REFERENCEABLE);
+      this.idFieldName = idFieldName;
+    }
+
+    public Referenceable(Schema schema) {
+      super(REFERENCEABLE);
+      this.idFieldName = schema.getProp(ID_FIELD_NAME);
+    }
+
+    @Override
+    public Schema addToSchema(Schema schema) {
+      super.addToSchema(schema);
+      schema.addProp(ID_FIELD_NAME, idFieldName);
+      return schema;
+    }
+
+    @Override
+    public String getName() {
+      return REFERENCEABLE;
+    }
+
+    public String getIdFieldName() {
+      return idFieldName;
+    }
+
+    @Override
+    public void validate(Schema schema) {
+      super.validate(schema);
+      Schema.Field idField = schema.getField(idFieldName);
+      if (idField == null || idField.schema().getType() != Schema.Type.LONG) {
+        throw new IllegalArgumentException("Invalid ID field: " + idFieldName + ": " + idField);
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void addReferenceTypes() {
+    LogicalTypes.register(Referenceable.REFERENCEABLE, new LogicalTypes.LogicalTypeFactory() {
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return new Referenceable(schema);
+      }
+    });
+    LogicalTypes.register(Reference.REFERENCE, new LogicalTypes.LogicalTypeFactory() {
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return new Reference(schema);
+      }
+    });
+  }
+
+  public static class ReferenceManager {
+    private interface Callback {
+      void set(Object referenceable);
+    }
+
+    private final Map<Long, Object> references = new HashMap<Long, Object>();
+    private final Map<Object, Long> ids = new IdentityHashMap<Object, Long>();
+    private final Map<Long, List<Callback>> callbacksById = new HashMap<Long, List<Callback>>();
+    private final ReferenceableTracker tracker = new ReferenceableTracker();
+    private final ReferenceHandler handler = new ReferenceHandler();
+
+    public ReferenceableTracker getTracker() {
+      return tracker;
+    }
+
+    public ReferenceHandler getHandler() {
+      return handler;
+    }
+
+    public class ReferenceableTracker extends Conversion<IndexedRecord> {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Class<IndexedRecord> getConvertedType() {
+        return (Class) Record.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return Referenceable.REFERENCEABLE;
+      }
+
+      @Override
+      public IndexedRecord fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        // read side
+        long id = getId(value, schema);
+
+        // keep track of this for later references
+        references.put(id, value);
+
+        // call any callbacks waiting to resolve this id
+        List<Callback> callbacks = callbacksById.get(id);
+        for (Callback callback : callbacks) {
+          callback.set(value);
+        }
+
+        return value;
+      }
+
+      @Override
+      public IndexedRecord toRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        // write side
+        long id = getId(value, schema);
+
+        // keep track of this for later references
+        //references.put(id, value);
+        ids.put(value, id);
+
+        return value;
+      }
+
+      private long getId(IndexedRecord referenceable, Schema schema) {
+        Referenceable info = (Referenceable) schema.getLogicalType();
+        int idField = schema.getField(info.getIdFieldName()).pos();
+        return (Long) referenceable.get(idField);
+      }
+    }
+
+    public class ReferenceHandler extends Conversion<IndexedRecord> {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Class<IndexedRecord> getConvertedType() {
+        return (Class) Record.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return Reference.REFERENCE;
+      }
+
+      @Override
+      public IndexedRecord fromRecord(final IndexedRecord record, Schema schema, LogicalType type) {
+        // read side: resolve the record or save a callback
+        final Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+
+        Long id = (Long) record.get(refField.pos());
+        if (id != null) {
+          if (references.containsKey(id)) {
+            record.put(refField.pos(), references.get(id));
+
+          } else {
+            List<Callback> callbacks = callbacksById.get(id);
+            if (callbacks == null) {
+              callbacks = new ArrayList<Callback>();
+              callbacksById.put(id, callbacks);
+            }
+            // add a callback to resolve this reference when the id is available
+            callbacks.add(new Callback() {
+              @Override
+              public void set(Object referenceable) {
+                record.put(refField.pos(), referenceable);
+              }
+            });
+          }
+        }
+
+        return record;
+      }
+
+      @Override
+      public IndexedRecord toRecord(IndexedRecord record, Schema schema, LogicalType type) {
+        // write side: replace a referenced field with its id
+        Schema.Field refField = schema.getField(((Reference) type).getRefFieldName());
+        IndexedRecord referenced = (IndexedRecord) record.get(refField.pos());
+        if (referenced == null) {
+          return record;
+        }
+
+        // hijack the field to return the id instead of the ref
+        return new HijackingIndexedRecord(record, refField.pos(), ids.get(referenced));
+      }
+    }
+
+    private static class HijackingIndexedRecord implements IndexedRecord {
+      private final IndexedRecord wrapped;
+      private final int index;
+      private final Object data;
+
+      public HijackingIndexedRecord(IndexedRecord wrapped, int index, Object data) {
+        this.wrapped = wrapped;
+        this.index = index;
+        this.data = data;
+      }
+
+      @Override
+      public void put(int i, Object v) {
+        throw new RuntimeException("[BUG] This is a read-only class.");
+      }
+
+      @Override
+      public Object get(int i) {
+        if (i == index) {
+          return data;
+        }
+        return wrapped.get(i);
+      }
+
+      @Override
+      public Schema getSchema() {
+        return wrapped.getSchema();
+      }
+    }
+  }
+
+  @Test
+  public void test() throws IOException {
+    ReferenceManager manager = new ReferenceManager();
+    GenericData model = new GenericData();
+    model.addLogicalTypeConversion(manager.getTracker());
+    model.addLogicalTypeConversion(manager.getHandler());
+
+    Schema parentSchema = Schema.createRecord("Parent", null, null, false);
+
+    Schema placeholderSchema = Schema.createRecord("Placeholder", null, null, false);
+    List<Schema.Field> placeholderFields = new ArrayList<Schema.Field>();
+    placeholderFields.add( // at least one field is needed to be a valid schema
+        new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null));
+    placeholderSchema.setFields(placeholderFields);
+
+    Referenceable idRef = new Referenceable("id");
+
+    Schema parentRefSchema = Schema.createUnion(
+        Schema.create(Schema.Type.NULL),
+        Schema.create(Schema.Type.LONG),
+        idRef.addToSchema(placeholderSchema));
+
+    Reference parentRef = new Reference("parent");
+
+    List<Schema.Field> childFields = new ArrayList<Schema.Field>();
+    childFields.add(new Schema.Field("c", Schema.create(Schema.Type.STRING), null, null));
+    childFields.add(new Schema.Field("parent", parentRefSchema, null, null));
+    Schema childSchema = parentRef.addToSchema(
+        Schema.createRecord("Child", null, null, false, childFields));
+
+    List<Schema.Field> parentFields = new ArrayList<Schema.Field>();
+    parentFields.add(new Schema.Field("id", Schema.create(Schema.Type.LONG), null, null));
+    parentFields.add(new Schema.Field("p", Schema.create(Schema.Type.STRING), null, null));
+    parentFields.add(new Schema.Field("child", childSchema, null, null));
+    parentSchema.setFields(parentFields);
+
+    Schema schema = idRef.addToSchema(parentSchema);
+
+    System.out.println("Schema: " + schema.toString(true));
+
+    Record parent = new Record(schema);
+    parent.put("id", 1L);
+    parent.put("p", "parent data!");
+
+    Record child = new Record(childSchema);
+    child.put("c", "child data!");
+    child.put("parent", parent);
+
+    parent.put("child", child);
+
+    // serialization round trip
+    File data = AvroTestUtil.write(temp, model, schema, parent);
+    List<Record> records = AvroTestUtil.read(model, schema, data);
+
+    Record actual = records.get(0);
+
+    // because the record is a recursive structure, equals won't work
+    Assert.assertEquals("Should correctly read back the parent id",
+        1L, actual.get("id"));
+    Assert.assertEquals("Should correctly read back the parent data",
+        new Utf8("parent data!"), actual.get("p"));
+
+    Record actualChild = (Record) actual.get("child");
+    Assert.assertEquals("Should correctly read back the child data",
+        new Utf8("child data!"), actualChild.get("c"));
+    Object childParent = actualChild.get("parent");
+    Assert.assertTrue("Should have a parent Record object",
+        childParent instanceof Record);
+
+    Record childParentRecord = (Record) actualChild.get("parent");
+    Assert.assertEquals("Should have the right parent id",
+        1L, childParentRecord.get("id"));
+    Assert.assertEquals("Should have the right parent data",
+        new Utf8("parent data!"), childParentRecord.get("p"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java
new file mode 100644
index 0000000..6809fff
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestGenericLogicalTypes.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.avro.Schema.Type.STRING;
+import static org.apache.parquet.avro.AvroTestUtil.field;
+import static org.apache.parquet.avro.AvroTestUtil.instance;
+import static org.apache.parquet.avro.AvroTestUtil.optionalField;
+import static org.apache.parquet.avro.AvroTestUtil.read;
+import static org.apache.parquet.avro.AvroTestUtil.record;
+
+/**
+ * This class is based on org.apache.avro.generic.TestGenericLogicalTypes
+ */
+public class TestGenericLogicalTypes {
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static final GenericData GENERIC = new GenericData();
+  public static final LogicalType DECIMAL_9_2 = LogicalTypes.decimal(9, 2);
+  public static final BigDecimal D1 = new BigDecimal("-34.34");
+  public static final BigDecimal D2 = new BigDecimal("117230.00");
+
+
+  @BeforeClass
+  public static void addDecimalAndUUID() {
+    GENERIC.addLogicalTypeConversion(new Conversions.DecimalConversion());
+    GENERIC.addLogicalTypeConversion(new Conversions.UUIDConversion());
+  }
+
+  private <T> List<T> getFieldValues(Collection<GenericRecord> records, String field,
+                                     Class<T> expectedClass) {
+    List<T> values = new ArrayList<T>();
+    for (GenericRecord record : records) {
+      values.add(expectedClass.cast(record.get(field)));
+    }
+    return values;
+  }
+
+  @Test
+  public void testReadUUID() throws IOException {
+    Schema uuidSchema = record("R",
+        field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+    GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID());
+    GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID());
+
+    Schema stringSchema = record("R", field("uuid", Schema.create(STRING)));
+    GenericRecord s1 = instance(stringSchema, "uuid", u1.get("uuid").toString());
+    GenericRecord s2 = instance(stringSchema, "uuid", u2.get("uuid").toString());
+
+    File test = write(stringSchema, s1, s2);
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        Arrays.asList(u1, u2), read(GENERIC, uuidSchema, test));
+  }
+
+  @Test
+  public void testWriteUUIDReadStringSchema() throws IOException {
+    Schema uuidSchema = record("R",
+        field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+    GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID());
+    GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID());
+
+    Schema stringUuidSchema = Schema.create(STRING);
+    stringUuidSchema.addProp(GenericData.STRING_PROP, "String");
+    Schema stringSchema = record("R", field("uuid", stringUuidSchema));
+    GenericRecord s1 = instance(stringSchema, "uuid", u1.get("uuid").toString());
+    GenericRecord s2 = instance(stringSchema, "uuid", u2.get("uuid").toString());
+
+    File test = write(GENERIC, uuidSchema, u1, u2);
+    Assert.assertEquals("Should read UUIDs as Strings",
+        Arrays.asList(s1, s2), read(GENERIC, stringSchema, test));
+  }
+
+  @Test
+  public void testWriteUUIDReadStringMissingLogicalType() throws IOException {
+    Schema uuidSchema = record("R",
+        field("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+    GenericRecord u1 = instance(uuidSchema, "uuid", UUID.randomUUID());
+    GenericRecord u2 = instance(uuidSchema, "uuid", UUID.randomUUID());
+
+    GenericRecord s1 = instance(uuidSchema, "uuid", new Utf8(u1.get("uuid").toString()));
+    GenericRecord s2 = instance(uuidSchema, "uuid", new Utf8(u2.get("uuid").toString()));
+
+    File test = write(GENERIC, uuidSchema, u1, u2);
+    Assert.assertEquals("Should read UUIDs as Strings",
+        Arrays.asList(s1, s2), read(GenericData.get(), uuidSchema, test));
+  }
+
+  @Test
+  public void testWriteNullableUUID() throws IOException {
+    Schema nullableUuidSchema = record("R",
+        optionalField("uuid", LogicalTypes.uuid().addToSchema(Schema.create(STRING))));
+    GenericRecord u1 = instance(nullableUuidSchema, "uuid", UUID.randomUUID());
+    GenericRecord u2 = instance(nullableUuidSchema, "uuid", UUID.randomUUID());
+
+    Schema stringUuidSchema = Schema.create(STRING);
+    stringUuidSchema.addProp(GenericData.STRING_PROP, "String");
+    Schema nullableStringSchema = record("R", optionalField("uuid", stringUuidSchema));
+    GenericRecord s1 = instance(nullableStringSchema, "uuid", u1.get("uuid").toString());
+    GenericRecord s2 = instance(nullableStringSchema, "uuid", u2.get("uuid").toString());
+
+    File test = write(GENERIC, nullableUuidSchema, u1, u2);
+    Assert.assertEquals("Should read UUIDs as Strings",
+        Arrays.asList(s1, s2), read(GENERIC, nullableStringSchema, test));
+  }
+
+  @Test
+  public void testReadDecimalFixed() throws IOException {
+    Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+    Schema fixedRecord = record("R", field("dec", fixedSchema));
+    Schema decimalSchema = DECIMAL_9_2.addToSchema(
+        Schema.createFixed("aFixed", null, null, 4));
+    Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+    GenericRecord r1 = instance(decimalRecord, "dec", D1);
+    GenericRecord r2 = instance(decimalRecord, "dec", D2);
+    List<GenericRecord> expected = Arrays.asList(r1, r2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    GenericRecord r1fixed = instance(fixedRecord, "dec",
+        conversion.toFixed(D1, fixedSchema, DECIMAL_9_2));
+    GenericRecord r2fixed = instance(fixedRecord, "dec",
+        conversion.toFixed(D2, fixedSchema, DECIMAL_9_2));
+
+    File test = write(fixedRecord, r1fixed, r2fixed);
+    Assert.assertEquals("Should convert fixed to BigDecimals",
+        expected, read(GENERIC, decimalRecord, test));
+  }
+
+  @Test
+  public void testWriteDecimalFixed() throws IOException {
+    Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+    Schema fixedRecord = record("R", field("dec", fixedSchema));
+    Schema decimalSchema = DECIMAL_9_2.addToSchema(
+        Schema.createFixed("aFixed", null, null, 4));
+    Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+    GenericRecord r1 = instance(decimalRecord, "dec", D1);
+    GenericRecord r2 = instance(decimalRecord, "dec", D2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    GenericRecord r1fixed = instance(fixedRecord, "dec",
+        conversion.toFixed(D1, fixedSchema, DECIMAL_9_2));
+    GenericRecord r2fixed = instance(fixedRecord, "dec",
+        conversion.toFixed(D2, fixedSchema, DECIMAL_9_2));
+    List<GenericRecord> expected = Arrays.asList(r1fixed, r2fixed);
+
+    File test = write(GENERIC, decimalRecord, r1, r2);
+    Assert.assertEquals("Should read BigDecimals as fixed",
+        expected, read(GENERIC, fixedRecord, test));
+  }
+
+  @Test
+  public void testReadDecimalBytes() throws IOException {
+    Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+    Schema bytesRecord = record("R", field("dec", bytesSchema));
+    Schema decimalSchema = DECIMAL_9_2.addToSchema(Schema.create(Schema.Type.BYTES));
+    Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+    GenericRecord r1 = instance(decimalRecord, "dec", D1);
+    GenericRecord r2 = instance(decimalRecord, "dec", D2);
+    List<GenericRecord> expected = Arrays.asList(r1, r2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    GenericRecord r1bytes = instance(bytesRecord, "dec",
+        conversion.toBytes(D1, bytesSchema, DECIMAL_9_2));
+    GenericRecord r2bytes = instance(bytesRecord, "dec",
+        conversion.toBytes(D2, bytesSchema, DECIMAL_9_2));
+
+    File test = write(bytesRecord, r1bytes, r2bytes);
+    Assert.assertEquals("Should convert bytes to BigDecimals",
+        expected, read(GENERIC, decimalRecord, test));
+  }
+
+  @Test
+  public void testWriteDecimalBytes() throws IOException {
+    Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+    Schema bytesRecord = record("R", field("dec", bytesSchema));
+    Schema decimalSchema = DECIMAL_9_2.addToSchema(Schema.create(Schema.Type.BYTES));
+    Schema decimalRecord = record("R", field("dec", decimalSchema));
+
+    GenericRecord r1 = instance(decimalRecord, "dec", D1);
+    GenericRecord r2 = instance(decimalRecord, "dec", D2);
+
+    Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+    // use the conversion directly instead of relying on the write side
+    GenericRecord r1bytes = instance(bytesRecord, "dec",
+        conversion.toBytes(D1, bytesSchema, DECIMAL_9_2));
+    GenericRecord r2bytes = instance(bytesRecord, "dec",
+        conversion.toBytes(D2, bytesSchema, DECIMAL_9_2));
+
+    List<GenericRecord> expected = Arrays.asList(r1bytes, r2bytes);
+
+    File test = write(GENERIC, decimalRecord, r1, r2);
+    Assert.assertEquals("Should read BigDecimals as bytes",
+        expected, read(GENERIC, bytesRecord, test));
+  }
+
+  private <D> File write(Schema schema, D... data) throws IOException {
+    return write(GenericData.get(), schema, data);
+  }
+
+  private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+    return AvroTestUtil.write(temp, model, schema, data);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
index 855a5b1..4fa71ea 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java
@@ -19,18 +19,23 @@
 package org.apache.parquet.avro;
 
 import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.io.Resources;
 import java.io.File;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericFixed;
@@ -39,12 +44,16 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -225,6 +234,113 @@ public class TestReadWrite {
     assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
   }
 
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  @Test
+  public void testDecimalValues() throws Exception {
+    Schema decimalSchema = Schema.createRecord("myrecord", null, null, false);
+    Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+        Schema.create(Schema.Type.BYTES));
+    decimalSchema.setFields(Collections.singletonList(
+        new Schema.Field("dec", decimal, null, null)));
+
+    // add the decimal conversion to a generic data model
+    GenericData decimalSupport = new GenericData();
+    decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
+
+    File file = temp.newFile("decimal.parquet");
+    file.delete();
+    Path path = new Path(file.toString());
+
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(path)
+        .withDataModel(decimalSupport)
+        .withSchema(decimalSchema)
+        .build();
+
+    Random random = new Random(34L);
+    GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema);
+    List<GenericRecord> expected = Lists.newArrayList();
+    for (int i = 0; i < 1000; i += 1) {
+      BigDecimal dec = new BigDecimal(new BigInteger(31, random), 2);
+      builder.set("dec", dec);
+
+      GenericRecord rec = builder.build();
+      expected.add(rec);
+      writer.write(builder.build());
+    }
+    writer.close();
+
+    ParquetReader<GenericRecord> reader = AvroParquetReader
+        .<GenericRecord>builder(path)
+        .withDataModel(decimalSupport)
+        .disableCompatibility()
+        .build();
+    List<GenericRecord> records = Lists.newArrayList();
+    GenericRecord rec;
+    while ((rec = reader.read()) != null) {
+      records.add(rec);
+    }
+    reader.close();
+
+    Assert.assertTrue("dec field should be a BigDecimal instance",
+        records.get(0).get("dec") instanceof BigDecimal);
+    Assert.assertEquals("Content should match", expected, records);
+  }
+
+  @Test
+  public void testFixedDecimalValues() throws Exception {
+    Schema decimalSchema = Schema.createRecord("myrecord", null, null, false);
+    Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+        Schema.createFixed("dec", null, null, 4));
+    decimalSchema.setFields(Collections.singletonList(
+        new Schema.Field("dec", decimal, null, null)));
+
+    // add the decimal conversion to a generic data model
+    GenericData decimalSupport = new GenericData();
+    decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
+
+    File file = temp.newFile("decimal.parquet");
+    file.delete();
+    Path path = new Path(file.toString());
+
+    ParquetWriter<GenericRecord> writer = AvroParquetWriter
+        .<GenericRecord>builder(path)
+        .withDataModel(decimalSupport)
+        .withSchema(decimalSchema)
+        .build();
+
+    Random random = new Random(34L);
+    GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema);
+    List<GenericRecord> expected = Lists.newArrayList();
+    for (int i = 0; i < 1000; i += 1) {
+      BigDecimal dec = new BigDecimal(new BigInteger(31, random), 2);
+      builder.set("dec", dec);
+
+      GenericRecord rec = builder.build();
+      expected.add(rec);
+      writer.write(builder.build());
+    }
+    writer.close();
+
+    ParquetReader<GenericRecord> reader = AvroParquetReader
+        .<GenericRecord>builder(path)
+        .withDataModel(decimalSupport)
+        .disableCompatibility()
+        .build();
+    List<GenericRecord> records = Lists.newArrayList();
+    GenericRecord rec;
+    while ((rec = reader.read()) != null) {
+      records.add(rec);
+    }
+    reader.close();
+
+    Assert.assertTrue("dec field should be a BigDecimal instance",
+        records.get(0).get("dec") instanceof BigDecimal);
+    Assert.assertEquals("Content should match", expected, records);
+  }
+
   @Test
   public void testAll() throws Exception {
     Schema schema = new Schema.Parser().parse(

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
index 64caacc..af6f938 100644
--- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWriteOldListBehavior.java
@@ -47,7 +47,6 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import sun.net.www.content.text.Generic;
 
 import static org.apache.parquet.avro.AvroTestUtil.array;
 import static org.apache.parquet.avro.AvroTestUtil.optional;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java
new file mode 100644
index 0000000..401e698
--- /dev/null
+++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReflectLogicalTypes.java
@@ -0,0 +1,705 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.reflect.AvroSchema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificData;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.parquet.avro.AvroTestUtil.read;
+
+/**
+ * This class is based on org.apache.avro.reflect.TestReflectLogicalTypes
+ *
+ * Tests various logical types
+ * * string => UUID
+ * * fixed and bytes => Decimal
+ * * record => Pair
+ */
+public class TestReflectLogicalTypes {
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  public static final ReflectData REFLECT = new ReflectData();
+
+  @BeforeClass
+  public static void addUUID() {
+    REFLECT.addLogicalTypeConversion(new Conversions.UUIDConversion());
+    REFLECT.addLogicalTypeConversion(new Conversions.DecimalConversion());
+  }
+
+  @Test
+  public void testReflectedSchema() {
+    Schema expected = SchemaBuilder.record(RecordWithUUIDList.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    expected.getField("uuids").schema().addProp(
+        SpecificData.CLASS_PROP, List.class.getName());
+    LogicalTypes.uuid().addToSchema(
+        expected.getField("uuids").schema().getElementType());
+
+    Schema actual = REFLECT.getSchema(RecordWithUUIDList.class);
+
+    Assert.assertEquals("Should use the UUID logical type", expected, actual);
+  }
+
+  // this can be static because the schema only comes from reflection
+  public static class DecimalRecordBytes {
+    // scale is required and will not be set by the conversion
+    @AvroSchema("{" +
+        "\"type\": \"bytes\"," +
+        "\"logicalType\": \"decimal\"," +
+        "\"precision\": 9," +
+        "\"scale\": 2" +
+        "}")
+    private BigDecimal decimal;
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      DecimalRecordBytes that = (DecimalRecordBytes) other;
+      if (decimal == null) {
+        return (that.decimal == null);
+      }
+
+      return decimal.equals(that.decimal);
+    }
+
+    @Override
+    public int hashCode() {
+      return decimal != null ? decimal.hashCode() : 0;
+    }
+  }
+
+  @Test
+  public void testDecimalBytes() throws IOException {
+    Schema schema = REFLECT.getSchema(DecimalRecordBytes.class);
+    Assert.assertEquals("Should have the correct record name",
+        "org.apache.parquet.avro.TestReflectLogicalTypes$",
+        schema.getNamespace());
+    Assert.assertEquals("Should have the correct record name",
+        "DecimalRecordBytes",
+        schema.getName());
+    Assert.assertEquals("Should have the correct logical type",
+        LogicalTypes.decimal(9, 2),
+        LogicalTypes.fromSchema(schema.getField("decimal").schema()));
+
+    DecimalRecordBytes record = new DecimalRecordBytes();
+    record.decimal = new BigDecimal("3.14");
+
+    File test = write(REFLECT, schema, record);
+    Assert.assertEquals("Should match the decimal after round trip",
+        Arrays.asList(record),
+        read(REFLECT, schema, test));
+  }
+
+  // this can be static because the schema only comes from reflection
+  public static class DecimalRecordFixed {
+    // scale is required and will not be set by the conversion
+    @AvroSchema("{" +
+        "\"name\": \"decimal_9\"," +
+        "\"type\": \"fixed\"," +
+        "\"size\": 4," +
+        "\"logicalType\": \"decimal\"," +
+        "\"precision\": 9," +
+        "\"scale\": 2" +
+        "}")
+    private BigDecimal decimal;
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      DecimalRecordFixed that = (DecimalRecordFixed) other;
+      if (decimal == null) {
+        return (that.decimal == null);
+      }
+
+      return decimal.equals(that.decimal);
+    }
+
+    @Override
+    public int hashCode() {
+      return decimal != null ? decimal.hashCode() : 0;
+    }
+  }
+
+  @Test
+  public void testDecimalFixed() throws IOException {
+    Schema schema = REFLECT.getSchema(DecimalRecordFixed.class);
+    Assert.assertEquals("Should have the correct record name",
+        "org.apache.parquet.avro.TestReflectLogicalTypes$",
+        schema.getNamespace());
+    Assert.assertEquals("Should have the correct record name",
+        "DecimalRecordFixed",
+        schema.getName());
+    Assert.assertEquals("Should have the correct logical type",
+        LogicalTypes.decimal(9, 2),
+        LogicalTypes.fromSchema(schema.getField("decimal").schema()));
+
+    DecimalRecordFixed record = new DecimalRecordFixed();
+    record.decimal = new BigDecimal("3.14");
+
+    File test = write(REFLECT, schema, record);
+    Assert.assertEquals("Should match the decimal after round trip",
+        Arrays.asList(record),
+        read(REFLECT, schema, test));
+  }
+
+  public static class Pair<X, Y> {
+    private final X first;
+    private final Y second;
+
+    private Pair(X first, Y second) {
+      this.first = first;
+      this.second = second;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (this == other) {
+        return true;
+      }
+
+      if (other == null || getClass() != other.getClass()) {
+        return false;
+      }
+
+      Pair<?, ?> that = (Pair<?, ?>) other;
+      if (first == null) {
+        if (that.first != null) {
+          return false;
+        }
+      } else if (first.equals(that.first)) {
+        return false;
+      }
+
+      if (second == null) {
+        if (that.second != null) {
+          return false;
+        }
+      } else if (second.equals(that.second)) {
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(new Object[] {first, second});
+    }
+
+    public static <X, Y> Pair<X, Y> of(X first, Y second) {
+      return new Pair<X, Y>(first, second);
+    }
+  }
+
+  public static class PairRecord {
+    @AvroSchema("{" +
+        "\"name\": \"Pair\"," +
+        "\"type\": \"record\"," +
+        "\"fields\": [" +
+        "    {\"name\": \"x\", \"type\": \"long\"}," +
+        "    {\"name\": \"y\", \"type\": \"long\"}" +
+        "  ]," +
+        "\"logicalType\": \"pair\"" +
+        "}")
+    Pair<Long, Long> pair;
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testPairRecord() throws IOException {
+    ReflectData model = new ReflectData();
+    model.addLogicalTypeConversion(new Conversion<Pair>() {
+      @Override
+      public Class<Pair> getConvertedType() {
+        return Pair.class;
+      }
+
+      @Override
+      public String getLogicalTypeName() {
+        return "pair";
+      }
+
+      @Override
+      public Pair fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+        return Pair.of(value.get(0), value.get(1));
+      }
+
+      @Override
+      public IndexedRecord toRecord(Pair value, Schema schema, LogicalType type) {
+        GenericData.Record record = new GenericData.Record(schema);
+        record.put(0, value.first);
+        record.put(1, value.second);
+        return record;
+      }
+    });
+
+    LogicalTypes.register("pair", new LogicalTypes.LogicalTypeFactory() {
+      private final LogicalType PAIR = new LogicalType("pair");
+      @Override
+      public LogicalType fromSchema(Schema schema) {
+        return PAIR;
+      }
+    });
+
+    Schema schema = model.getSchema(PairRecord.class);
+    Assert.assertEquals("Should have the correct record name",
+        "org.apache.parquet.avro.TestReflectLogicalTypes$",
+        schema.getNamespace());
+    Assert.assertEquals("Should have the correct record name",
+        "PairRecord",
+        schema.getName());
+    Assert.assertEquals("Should have the correct logical type",
+        "pair",
+        LogicalTypes.fromSchema(schema.getField("pair").schema()).getName());
+
+    PairRecord record = new PairRecord();
+    record.pair = Pair.of(34L, 35L);
+    List<PairRecord> expected = new ArrayList<PairRecord>();
+    expected.add(record);
+
+    File test = write(model, schema, record);
+    Pair<Long, Long> actual = AvroTestUtil
+        .<PairRecord>read(model, schema, test)
+        .get(0).pair;
+    Assert.assertEquals("Data should match after serialization round-trip",
+        34L, (long) actual.first);
+    Assert.assertEquals("Data should match after serialization round-trip",
+        35L, (long) actual.second);
+  }
+
+  @Test
+  public void testReadUUID() throws IOException {
+    Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithStringUUID r1 = new RecordWithStringUUID();
+    r1.uuid = u1.toString();
+    RecordWithStringUUID r2 = new RecordWithStringUUID();
+    r2.uuid = u2.toString();
+
+    List<RecordWithUUID> expected = Arrays.asList(
+        new RecordWithUUID(), new RecordWithUUID());
+    expected.get(0).uuid = u1;
+    expected.get(1).uuid = u2;
+
+    File test = write(
+        ReflectData.get().getSchema(RecordWithStringUUID.class), r1, r2);
+
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        expected, read(REFLECT, uuidSchema, test));
+
+    // verify that the field's type overrides the logical type
+    Schema uuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema());
+
+    Assert.assertEquals("Should not convert to UUID if accessor is String",
+        Arrays.asList(r1, r2),
+        read(REFLECT, uuidStringSchema, test));
+  }
+
+  @Test
+  public void testWriteUUID() throws IOException {
+    Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithUUID r1 = new RecordWithUUID();
+    r1.uuid = u1;
+    RecordWithUUID r2 = new RecordWithUUID();
+    r2.uuid = u2;
+
+    List<RecordWithStringUUID> expected = Arrays.asList(
+        new RecordWithStringUUID(), new RecordWithStringUUID());
+    expected.get(0).uuid = u1.toString();
+    expected.get(1).uuid = u2.toString();
+
+    File test = write(REFLECT, uuidSchema, r1, r2);
+
+    // verify that the field's type overrides the logical type
+    Schema uuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+
+    Assert.assertEquals("Should read uuid as String without UUID conversion",
+        expected,
+        read(REFLECT, uuidStringSchema, test));
+
+    LogicalTypes.uuid().addToSchema(uuidStringSchema.getField("uuid").schema());
+    Assert.assertEquals("Should read uuid as String without UUID logical type",
+        expected,
+        read(ReflectData.get(), uuidStringSchema, test));
+  }
+
+  @Test
+  public void testWriteNullableUUID() throws IOException {
+    Schema nullableUuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+        .fields().optionalString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(
+        nullableUuidSchema.getField("uuid").schema().getTypes().get(1));
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithUUID r1 = new RecordWithUUID();
+    r1.uuid = u1;
+    RecordWithUUID r2 = new RecordWithUUID();
+    r2.uuid = u2;
+
+    List<RecordWithStringUUID> expected = Arrays.asList(
+        new RecordWithStringUUID(), new RecordWithStringUUID());
+    expected.get(0).uuid = u1.toString();
+    expected.get(1).uuid = u2.toString();
+
+    File test = write(REFLECT, nullableUuidSchema, r1, r2);
+
+    // verify that the field's type overrides the logical type
+    Schema nullableUuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().optionalString("uuid").endRecord();
+
+    Assert.assertEquals("Should read uuid as String without UUID conversion",
+        expected,
+        read(REFLECT, nullableUuidStringSchema, test));
+  }
+
+  @Test(expected = ClassCastException.class)
+  public void testWriteUUIDMissingLogicalType() throws IOException {
+    Schema uuidSchema = SchemaBuilder.record(RecordWithUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithUUID r1 = new RecordWithUUID();
+    r1.uuid = u1;
+    RecordWithUUID r2 = new RecordWithUUID();
+    r2.uuid = u2;
+
+    // write without using REFLECT, which has the logical type
+    File test = write(uuidSchema, r1, r2);
+
+    // verify that the field's type overrides the logical type
+    Schema uuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+
+    // this fails with an AppendWriteException wrapping ClassCastException
+    // because the UUID isn't converted to a CharSequence expected internally
+    read(ReflectData.get(), uuidStringSchema, test);
+  }
+
+  @Test
+  public void testReadUUIDGenericRecord() throws IOException {
+    Schema uuidSchema = SchemaBuilder.record("RecordWithUUID")
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    RecordWithStringUUID r1 = new RecordWithStringUUID();
+    r1.uuid = u1.toString();
+    RecordWithStringUUID r2 = new RecordWithStringUUID();
+    r2.uuid = u2.toString();
+
+    List<GenericData.Record> expected = Arrays.asList(
+        new GenericData.Record(uuidSchema), new GenericData.Record(uuidSchema));
+    expected.get(0).put("uuid", u1);
+    expected.get(1).put("uuid", u2);
+
+    File test = write(
+        ReflectData.get().getSchema(RecordWithStringUUID.class), r1, r2);
+
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        expected, read(REFLECT, uuidSchema, test));
+
+    // verify that the field's type overrides the logical type
+    Schema uuidStringSchema = SchemaBuilder
+        .record(RecordWithStringUUID.class.getName())
+        .fields().requiredString("uuid").endRecord();
+    LogicalTypes.uuid().addToSchema(uuidSchema.getField("uuid").schema());
+
+    Assert.assertEquals("Should not convert to UUID if accessor is String",
+        Arrays.asList(r1, r2),
+        read(REFLECT, uuidStringSchema, test));
+  }
+
+  @Test
+  public void testReadUUIDArray() throws IOException {
+    Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    LogicalTypes.uuid().addToSchema(
+        uuidArraySchema.getField("uuids").schema().getElementType());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    GenericRecord r = new GenericData.Record(uuidArraySchema);
+    r.put("uuids", Arrays.asList(u1.toString(), u2.toString()));
+
+    RecordWithUUIDArray expected = new RecordWithUUIDArray();
+    expected.uuids = new UUID[] {u1, u2};
+
+    File test = write(uuidArraySchema, r);
+
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        expected,
+        read(REFLECT, uuidArraySchema, test).get(0));
+  }
+
+  @Test
+  public void testWriteUUIDArray() throws IOException {
+    Schema uuidArraySchema = SchemaBuilder.record(RecordWithUUIDArray.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    LogicalTypes.uuid().addToSchema(
+        uuidArraySchema.getField("uuids").schema().getElementType());
+
+    Schema stringArraySchema = SchemaBuilder.record("RecordWithUUIDArray")
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    stringArraySchema.getField("uuids").schema()
+        .addProp(SpecificData.CLASS_PROP, List.class.getName());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    GenericRecord expected = new GenericData.Record(stringArraySchema);
+    List<String> uuids = new ArrayList<String>();
+    uuids.add(u1.toString());
+    uuids.add(u2.toString());
+    expected.put("uuids", uuids);
+
+    RecordWithUUIDArray r = new RecordWithUUIDArray();
+    r.uuids = new UUID[] {u1, u2};
+
+    File test = write(REFLECT, uuidArraySchema, r);
+
+    Assert.assertEquals("Should read UUIDs as Strings",
+        expected,
+        read(ReflectData.get(), stringArraySchema, test).get(0));
+  }
+
+  @Test
+  public void testReadUUIDList() throws IOException {
+    Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    uuidListSchema.getField("uuids").schema().addProp(
+        SpecificData.CLASS_PROP, List.class.getName());
+    LogicalTypes.uuid().addToSchema(
+        uuidListSchema.getField("uuids").schema().getElementType());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    GenericRecord r = new GenericData.Record(uuidListSchema);
+    r.put("uuids", Arrays.asList(u1.toString(), u2.toString()));
+
+    RecordWithUUIDList expected = new RecordWithUUIDList();
+    expected.uuids = Arrays.asList(u1, u2);
+
+    File test = write(uuidListSchema, r);
+
+    Assert.assertEquals("Should convert Strings to UUIDs",
+        expected, read(REFLECT, uuidListSchema, test).get(0));
+  }
+
+  @Test
+  public void testWriteUUIDList() throws IOException {
+    Schema uuidListSchema = SchemaBuilder.record(RecordWithUUIDList.class.getName())
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    uuidListSchema.getField("uuids").schema().addProp(
+        SpecificData.CLASS_PROP, List.class.getName());
+    LogicalTypes.uuid().addToSchema(
+        uuidListSchema.getField("uuids").schema().getElementType());
+
+    Schema stringArraySchema = SchemaBuilder.record("RecordWithUUIDArray")
+        .fields()
+        .name("uuids").type().array().items().stringType().noDefault()
+        .endRecord();
+    stringArraySchema.getField("uuids").schema()
+        .addProp(SpecificData.CLASS_PROP, List.class.getName());
+
+    UUID u1 = UUID.randomUUID();
+    UUID u2 = UUID.randomUUID();
+
+    GenericRecord expected = new GenericData.Record(stringArraySchema);
+    expected.put("uuids", Arrays.asList(u1.toString(), u2.toString()));
+
+    RecordWithUUIDList r = new RecordWithUUIDList();
+    r.uuids = Arrays.asList(u1, u2);
+
+    File test = write(REFLECT, uuidListSchema, r);
+
+    Assert.assertEquals("Should read UUIDs as Strings",
+        expected,
+        read(REFLECT, stringArraySchema, test).get(0));
+  }
+
+  private <D> File write(Schema schema, D... data) throws IOException {
+    return write(ReflectData.get(), schema, data);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <D> File write(GenericData model, Schema schema, D... data) throws IOException {
+    return AvroTestUtil.write(temp, model, schema, data);
+  }
+}
+
+class RecordWithUUID {
+  UUID uuid;
+
+  @Override
+  public int hashCode() {
+    return uuid.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof RecordWithUUID)) {
+      return false;
+    }
+    RecordWithUUID that = (RecordWithUUID) obj;
+    return this.uuid.equals(that.uuid);
+  }
+}
+
+class RecordWithStringUUID {
+  String uuid;
+
+  @Override
+  public int hashCode() {
+    return uuid.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof RecordWithStringUUID)) {
+      return false;
+    }
+    RecordWithStringUUID that = (RecordWithStringUUID) obj;
+    return this.uuid.equals(that.uuid);
+  }
+}
+
+class RecordWithUUIDArray {
+  UUID[] uuids;
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(uuids);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof RecordWithUUIDArray)) {
+      return false;
+    }
+    RecordWithUUIDArray that = (RecordWithUUIDArray) obj;
+    return Arrays.equals(this.uuids, that.uuids);
+  }
+}
+
+class RecordWithUUIDList {
+  List<UUID> uuids;
+
+  @Override
+  public int hashCode() {
+    return uuids.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof RecordWithUUIDList)) {
+      return false;
+    }
+    RecordWithUUIDList that = (RecordWithUUIDList) obj;
+    return this.uuids.equals(that.uuids);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index 4a167bd..30787f0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -25,6 +25,10 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
 import org.apache.parquet.io.ParquetEncodingException;
@@ -209,26 +213,33 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
   }
 
-  private static class FromStringBinary extends ByteArrayBackedBinary {
-    public FromStringBinary(String value) {
-      // reused is false, because we do not
-      // hold on to the underlying bytes,
-      // and nobody else has a handle to them
+  private static class FromStringBinary extends ByteBufferBackedBinary {
+    public FromStringBinary(CharSequence value) {
+      // reused is false, because we do not hold on to the buffer after
+      // conversion, and nobody else has a handle to it
       super(encodeUTF8(value), false);
     }
 
-    private static byte[] encodeUTF8(String value) {
-      try {
-        return value.getBytes("UTF-8");
-      } catch (UnsupportedEncodingException e) {
-        throw new ParquetEncodingException("UTF-8 not supported.", e);
-      }
-    }
-
     @Override
     public String toString() {
       return "Binary{\"" + toStringUsingUTF8() + "\"}";
     }
+
+    private static final ThreadLocal<CharsetEncoder> ENCODER =
+        new ThreadLocal<CharsetEncoder>() {
+          @Override
+          protected CharsetEncoder initialValue() {
+            return StandardCharsets.UTF_8.newEncoder();
+          }
+        };
+
+    private static ByteBuffer encodeUTF8(CharSequence value) {
+      try {
+        return ENCODER.get().encode(CharBuffer.wrap(value));
+      } catch (CharacterCodingException e) {
+        throw new ParquetEncodingException("UTF-8 not supported.", e);
+      }
+    }
   }
 
   public static Binary fromReusedByteArray(final byte[] value, final int offset, final int length) {
@@ -359,6 +370,13 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     private int offset;
     private int length;
 
+    public ByteBufferBackedBinary(ByteBuffer value, boolean isBackingBytesReused) {
+      this.value = value;
+      this.offset = value.position();
+      this.length = value.remaining();
+      this.isBackingBytesReused = isBackingBytesReused;
+    }
+
     public ByteBufferBackedBinary(ByteBuffer value, int offset, int length, boolean isBackingBytesReused) {
       this.value = value;
       this.offset = offset;
@@ -521,11 +539,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
   }
 
   public static Binary fromReusedByteBuffer(final ByteBuffer value) {
-    return new ByteBufferBackedBinary(value, value.position(), value.remaining(), true);
+    return new ByteBufferBackedBinary(value, true);
   }
 
   public static Binary fromConstantByteBuffer(final ByteBuffer value) {
-    return new ByteBufferBackedBinary(value, value.position(), value.remaining(), false);
+    return new ByteBufferBackedBinary(value, false);
   }
 
   @Deprecated
@@ -536,7 +554,12 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     return fromReusedByteBuffer(value); // Assume producer intends to reuse byte[]
   }
 
-  public static Binary fromString(final String value) {
+  public static Binary fromString(String value) {
+    // this method is for binary backward-compatibility
+    return fromString((CharSequence) value);
+  }
+
+  public static Binary fromString(CharSequence value) {
     return new FromStringBinary(value);
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
index d3cc97b..9af71af 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
@@ -336,9 +336,9 @@ public class Types {
      * @param length an int length
      * @return this builder for method chaining
      */
-    public BasePrimitiveBuilder<P, THIS> length(int length) {
+    public THIS length(int length) {
       this.length = length;
-      return this;
+      return self();
     }
 
     /**
@@ -351,9 +351,9 @@ public class Types {
      * @param precision an int precision value for the DECIMAL
      * @return this builder for method chaining
      */
-    public BasePrimitiveBuilder<P, THIS> precision(int precision) {
+    public THIS precision(int precision) {
       this.precision = precision;
-      return this;
+      return self();
     }
 
     /**
@@ -369,9 +369,9 @@ public class Types {
      * @param scale an int scale value for the DECIMAL
      * @return this builder for method chaining
      */
-    public BasePrimitiveBuilder<P, THIS> scale(int scale) {
+    public THIS scale(int scale) {
       this.scale = scale;
-      return this;
+      return self();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
index 88aa4b2..a541e1b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
@@ -198,16 +198,6 @@ public class TestBinary {
     Binary copy = bao.binary.copy();
 
     assertSame(copy, bao.binary);
-
-    mutate(bao.original);
-
-    byte[] expected = testString.getBytes(UTF8);
-    mutate(expected);
-
-    assertArrayEquals(expected, copy.getBytes());
-    assertArrayEquals(expected, copy.getBytesUnsafe());
-    assertArrayEquals(expected, copy.copy().getBytesUnsafe());
-    assertArrayEquals(expected, copy.copy().getBytes());
   }
 
   private void testReusedCopy(BinaryFactory bf) throws Exception {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b24a1d1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f606faa..9314a53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -93,6 +93,7 @@
     <fastutil.version>6.5.7</fastutil.version>
     <semver.api.version>0.9.33</semver.api.version>
     <slf4j.version>1.7.5</slf4j.version>
+    <avro.version>1.8.0</avro.version>
   </properties>
 
   <modules>


Mime
View raw message