http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
new file mode 100644
index 0000000..f76f367
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java
@@ -0,0 +1,700 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.InvalidRecordException;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.Converter;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.PrimitiveConverter;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
+
+class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter {
+
+ private final ParentValueContainer parent;
+ protected T currentRecord;
+ private final Converter[] converters;
+
+ private final Schema avroSchema;
+ private final Class<? extends IndexedRecord> specificClass;
+
+ private final GenericData model;
+ private final Map<Schema.Field, Object> recordDefaults = new HashMap<Schema.Field, Object>();
+
+ public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) {
+ this(parquetSchema, avroSchema, SpecificData.get());
+ }
+
+ public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema,
+ GenericData baseModel) {
+ this(null, parquetSchema, avroSchema, baseModel);
+ }
+
+ public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType
+ parquetSchema, Schema avroSchema) {
+ this(parent, parquetSchema, avroSchema, SpecificData.get());
+ }
+
+ public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType
+ parquetSchema, Schema avroSchema, GenericData baseModel) {
+ this.parent = parent;
+ this.avroSchema = avroSchema;
+ int schemaSize = parquetSchema.getFieldCount();
+ this.converters = new Converter[schemaSize];
+ this.specificClass = baseModel instanceof SpecificData ?
+ ((SpecificData) baseModel).getClass(avroSchema) : null;
+
+ this.model = this.specificClass == null ? GenericData.get() : baseModel;
+
+ Map<String, Integer> avroFieldIndexes = new HashMap<String, Integer>();
+ int avroFieldIndex = 0;
+ for (Schema.Field field: avroSchema.getFields()) {
+ avroFieldIndexes.put(field.name(), avroFieldIndex++);
+ }
+ int parquetFieldIndex = 0;
+ for (Type parquetField: parquetSchema.getFields()) {
+ Schema.Field avroField = getAvroField(parquetField.getName());
+ Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema());
+ final int finalAvroIndex = avroFieldIndexes.remove(avroField.name());
+ converters[parquetFieldIndex++] = newConverter(nonNullSchema, parquetField, model, new ParentValueContainer() {
+ @Override
+ void add(Object value) {
+ AvroIndexedRecordConverter.this.set(finalAvroIndex, value);
+ }
+ });
+ }
+ // store defaults for any new Avro fields from avroSchema that are not in the writer schema (parquetSchema)
+ for (String fieldName : avroFieldIndexes.keySet()) {
+ Schema.Field field = avroSchema.getField(fieldName);
+ if (field.schema().getType() == Schema.Type.NULL) {
+ continue; // skip null since Parquet does not write nulls
+ }
+ if (field.defaultValue() == null || model.getDefaultValue(field) == null) {
+ continue; // field has no default
+ }
+ recordDefaults.put(field, model.getDefaultValue(field));
+ }
+ }
+
+ private Schema.Field getAvroField(String parquetFieldName) {
+ Schema.Field avroField = avroSchema.getField(parquetFieldName);
+ for (Schema.Field f : avroSchema.getFields()) {
+ if (f.aliases().contains(parquetFieldName)) {
+ return f;
+ }
+ }
+ if (avroField == null) {
+ throw new InvalidRecordException(String.format("Parquet/Avro schema mismatch. Avro field '%s' not found.",
+ parquetFieldName));
+ }
+ return avroField;
+ }
+
+ private static Converter newConverter(Schema schema, Type type,
+ GenericData model, ParentValueContainer parent) {
+ if (schema.getType().equals(Schema.Type.BOOLEAN)) {
+ return new FieldBooleanConverter(parent);
+ } else if (schema.getType().equals(Schema.Type.INT)) {
+ return new FieldIntegerConverter(parent);
+ } else if (schema.getType().equals(Schema.Type.LONG)) {
+ return new FieldLongConverter(parent);
+ } else if (schema.getType().equals(Schema.Type.FLOAT)) {
+ return new FieldFloatConverter(parent);
+ } else if (schema.getType().equals(Schema.Type.DOUBLE)) {
+ return new FieldDoubleConverter(parent);
+ } else if (schema.getType().equals(Schema.Type.BYTES)) {
+ return new FieldBytesConverter(parent);
+ } else if (schema.getType().equals(Schema.Type.STRING)) {
+ return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8);
+ } else if (schema.getType().equals(Schema.Type.RECORD)) {
+ return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema, model);
+ } else if (schema.getType().equals(Schema.Type.ENUM)) {
+ return new FieldEnumConverter(parent, schema, model);
+ } else if (schema.getType().equals(Schema.Type.ARRAY)) {
+ return new AvroArrayConverter(parent, type.asGroupType(), schema, model);
+ } else if (schema.getType().equals(Schema.Type.MAP)) {
+ return new MapConverter(parent, type.asGroupType(), schema, model);
+ } else if (schema.getType().equals(Schema.Type.UNION)) {
+ return new AvroUnionConverter(parent, type, schema, model);
+ } else if (schema.getType().equals(Schema.Type.FIXED)) {
+ return new FieldFixedConverter(parent, schema, model);
+ }
+ throw new UnsupportedOperationException(String.format("Cannot convert Avro type: %s" +
+ " (Parquet type: %s) ", schema, type));
+ }
+
+ private void set(int index, Object value) {
+ this.currentRecord.put(index, value);
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {
+ // Should do the right thing whether it is generic or specific
+ this.currentRecord = (T) ((this.specificClass == null) ?
+ new GenericData.Record(avroSchema) :
+ ((SpecificData) model).newInstance(specificClass, avroSchema));
+ }
+
+ @Override
+ public void end() {
+ fillInDefaults();
+ if (parent != null) {
+ parent.add(currentRecord);
+ }
+ }
+
+ private void fillInDefaults() {
+ for (Map.Entry<Schema.Field, Object> entry : recordDefaults.entrySet()) {
+ Schema.Field f = entry.getKey();
+ // replace following with model.deepCopy once AVRO-1455 is being used
+ Object defaultValue = deepCopy(f.schema(), entry.getValue());
+ this.currentRecord.put(f.pos(), defaultValue);
+ }
+ }
+
+ private Object deepCopy(Schema schema, Object value) {
+ switch (schema.getType()) {
+ case BOOLEAN:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ return value;
+ default:
+ return model.deepCopy(schema, value);
+ }
+ }
+
+ T getCurrentRecord() {
+ return currentRecord;
+ }
+
+ static abstract class ParentValueContainer {
+
+ /**
+ * Adds the value to the parent.
+ */
+ abstract void add(Object value);
+
+ }
+
+ static final class FieldBooleanConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+
+ public FieldBooleanConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBoolean(boolean value) {
+ parent.add(value);
+ }
+
+ }
+
+ static final class FieldIntegerConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+
+ public FieldIntegerConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(value);
+ }
+
+ }
+
+ static final class FieldLongConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+
+ public FieldLongConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(Long.valueOf(value));
+ }
+
+ @Override
+ final public void addLong(long value) {
+ parent.add(value);
+ }
+
+ }
+
+ static final class FieldFloatConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+
+ public FieldFloatConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(Float.valueOf(value));
+ }
+
+ @Override
+ final public void addLong(long value) {
+ parent.add(Float.valueOf(value));
+ }
+
+ @Override
+ final public void addFloat(float value) {
+ parent.add(value);
+ }
+
+ }
+
+ static final class FieldDoubleConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+
+ public FieldDoubleConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(Double.valueOf(value));
+ }
+
+ @Override
+ final public void addLong(long value) {
+ parent.add(Double.valueOf(value));
+ }
+
+ @Override
+ final public void addFloat(float value) {
+ parent.add(Double.valueOf(value));
+ }
+
+ @Override
+ final public void addDouble(double value) {
+ parent.add(value);
+ }
+
+ }
+
+ static final class FieldBytesConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+
+ public FieldBytesConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.add(ByteBuffer.wrap(value.getBytes()));
+ }
+
+ }
+
+ static final class FieldStringConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+ private final boolean dictionarySupport;
+ private String[] dict;
+
+ public FieldStringConverter(ParentValueContainer parent, boolean dictionarySupport) {
+ this.parent = parent;
+ this.dictionarySupport = dictionarySupport;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.add(value.toStringUsingUTF8());
+ }
+
+ @Override
+ public boolean hasDictionarySupport() {
+ return dictionarySupport;
+ }
+
+ @Override
+ public void setDictionary(Dictionary dictionary) {
+ dict = new String[dictionary.getMaxId() + 1];
+ for (int i = 0; i <= dictionary.getMaxId(); i++) {
+ dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8();
+ }
+ }
+
+ @Override
+ public void addValueFromDictionary(int dictionaryId) {
+ parent.add(dict[dictionaryId]);
+ }
+ }
+
+ static final class FieldEnumConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+ private final Class<? extends Enum> enumClass;
+
+ public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema,
+ GenericData model) {
+ this.parent = parent;
+ this.enumClass = model instanceof SpecificData ?
+ ((SpecificData) model).getClass(enumSchema) :
+ SpecificData.get().getClass(enumSchema);
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ Object enumValue = value.toStringUsingUTF8();
+ if (enumClass != null) {
+ enumValue = (Enum.valueOf(enumClass,(String)enumValue));
+ }
+ parent.add(enumValue);
+ }
+ }
+
+ static final class FieldFixedConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+ private final Schema avroSchema;
+ private final Class<? extends GenericData.Fixed> fixedClass;
+ private final Constructor fixedClassCtor;
+
+ public FieldFixedConverter(ParentValueContainer parent, Schema avroSchema,
+ GenericData model) {
+ this.parent = parent;
+ this.avroSchema = avroSchema;
+ this.fixedClass = model instanceof SpecificData ?
+ ((SpecificData) model).getClass(avroSchema) :
+ SpecificData.get().getClass(avroSchema);
+ if (fixedClass != null) {
+ try {
+ this.fixedClassCtor =
+ fixedClass.getConstructor(new Class[] { byte[].class });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ this.fixedClassCtor = null;
+ }
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ if (fixedClass == null) {
+ parent.add(new GenericData.Fixed(avroSchema, value.getBytes()));
+ } else {
+ if (fixedClassCtor == null) {
+ throw new IllegalArgumentException(
+ "fixedClass specified but fixedClassCtor is null.");
+ }
+ try {
+ Object fixed = fixedClassCtor.newInstance(value.getBytes());
+ parent.add(fixed);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Converter for a list.
+ *
+ * <pre>
+ * optional group the_list (LIST) { <-- this layer
+ * repeated group array {
+ * optional (type) element;
+ * }
+ * }
+ * </pre>
+ *
+ * This class also implements LIST element backward-compatibility rules.
+ *
+ * @param <T> The type of elements in the list
+ */
+ static final class AvroArrayConverter<T> extends GroupConverter {
+
+ private final ParentValueContainer parent;
+ private final Schema avroSchema;
+ private final Converter converter;
+ private GenericArray<T> array;
+
+ public AvroArrayConverter(ParentValueContainer parent, GroupType type,
+ Schema avroSchema, GenericData model) {
+ this.parent = parent;
+ this.avroSchema = avroSchema;
+ Schema elementSchema = this.avroSchema.getElementType();
+ Type repeatedType = type.getType(0);
+ // always determine whether the repeated type is the element type by
+ // matching it against the element schema.
+ if (isElementType(repeatedType, elementSchema)) {
+ // the element type is the repeated type (and required)
+ converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() {
+ @Override
+ @SuppressWarnings("unchecked")
+ void add(Object value) {
+ array.add((T) value);
+ }
+ });
+ } else {
+ // the element is wrapped in a synthetic group and may be optional
+ converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model);
+ }
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return converter;
+ }
+
+ @Override
+ public void start() {
+ array = new GenericData.Array<T>(0, avroSchema);
+ }
+
+ @Override
+ public void end() {
+ parent.add(array);
+ }
+
+ /**
+ * Returns whether the given type is the element type of a list or is a
+ * synthetic group with one field that is the element type. This is
+ * determined by checking whether the type can be a synthetic group and by
+ * checking whether a potential synthetic group matches the expected schema.
+ * <p>
+ * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this
+ * method never guesses because the expected schema is known.
+ *
+ * @param repeatedType a type that may be the element type
+ * @param elementSchema the expected Schema for list elements
+ * @return {@code true} if the repeatedType is the element schema
+ */
+ static boolean isElementType(Type repeatedType, Schema elementSchema) {
+ if (repeatedType.isPrimitive() ||
+ repeatedType.asGroupType().getFieldCount() > 1) {
+ // The repeated type must be the element type because it is an invalid
+ // synthetic wrapper (must be a group with one field).
+ return true;
+ } else if (elementSchema != null &&
+ elementSchema.getType() == Schema.Type.RECORD &&
+ elementSchema.getFields().size() == 1 &&
+ elementSchema.getFields().get(0).name().equals(
+ repeatedType.asGroupType().getFieldName(0))) {
+ // The repeated type must be the element type because it matches the
+ // structure of the Avro element's schema.
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Converter for list elements.
+ *
+ * <pre>
+ * optional group the_list (LIST) {
+ * repeated group array { <-- this layer
+ * optional (type) element;
+ * }
+ * }
+ * </pre>
+ */
+ final class ElementConverter extends GroupConverter {
+ private T element;
+ private final Converter elementConverter;
+
+ public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) {
+ Type elementType = repeatedType.getType(0);
+ Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema);
+ this.elementConverter = newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() {
+ @Override
+ @SuppressWarnings("unchecked")
+ void add(Object value) {
+ ElementConverter.this.element = (T) value;
+ }
+ });
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ Preconditions.checkArgument(
+ fieldIndex == 0, "Illegal field index: " + fieldIndex);
+ return elementConverter;
+ }
+
+ @Override
+ public void start() {
+ element = null;
+ }
+
+ @Override
+ public void end() {
+ array.add(element);
+ }
+ }
+ }
+
+ static final class AvroUnionConverter<T> extends GroupConverter {
+
+ private final ParentValueContainer parent;
+ private final Converter[] memberConverters;
+ private Object memberValue = null;
+
+ public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema,
+ Schema avroSchema, GenericData model) {
+ this.parent = parent;
+ GroupType parquetGroup = parquetSchema.asGroupType();
+ this.memberConverters = new Converter[ parquetGroup.getFieldCount()];
+
+ int parquetIndex = 0;
+ for (int index = 0; index < avroSchema.getTypes().size(); index++) {
+ Schema memberSchema = avroSchema.getTypes().get(index);
+ if (!memberSchema.getType().equals(Schema.Type.NULL)) {
+ Type memberType = parquetGroup.getType(parquetIndex);
+ memberConverters[parquetIndex] = newConverter(memberSchema, memberType, model, new ParentValueContainer() {
+ @Override
+ void add(Object value) {
+ Preconditions.checkArgument(memberValue==null, "Union is resolving to more than one type");
+ memberValue = value;
+ }
+ });
+ parquetIndex++; // Note for nulls the parquetIndex id not increased
+ }
+ }
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return memberConverters[fieldIndex];
+ }
+
+ @Override
+ public void start() {
+ memberValue = null;
+ }
+
+ @Override
+ public void end() {
+ parent.add(memberValue);
+ }
+ }
+
+ static final class MapConverter<V> extends GroupConverter {
+
+ private final ParentValueContainer parent;
+ private final Converter keyValueConverter;
+ private Map<String, V> map;
+
+ public MapConverter(ParentValueContainer parent, GroupType mapType,
+ Schema mapSchema, GenericData model) {
+ this.parent = parent;
+ GroupType repeatedKeyValueType = mapType.getType(0).asGroupType();
+ this.keyValueConverter = new MapKeyValueConverter(
+ repeatedKeyValueType, mapSchema, model);
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return keyValueConverter;
+ }
+
+ @Override
+ public void start() {
+ this.map = new HashMap<String, V>();
+ }
+
+ @Override
+ public void end() {
+ parent.add(map);
+ }
+
+ final class MapKeyValueConverter extends GroupConverter {
+
+ private String key;
+ private V value;
+ private final Converter keyConverter;
+ private final Converter valueConverter;
+
+ public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema,
+ GenericData model) {
+ keyConverter = new PrimitiveConverter() {
+ @Override
+ final public void addBinary(Binary value) {
+ key = value.toStringUsingUTF8();
+ }
+ };
+
+ Type valueType = keyValueType.getType(1);
+ Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType());
+ valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() {
+ @Override
+ @SuppressWarnings("unchecked")
+ void add(Object value) {
+ MapKeyValueConverter.this.value = (V) value;
+ }
+ });
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ if (fieldIndex == 0) {
+ return keyConverter;
+ } else if (fieldIndex == 1) {
+ return valueConverter;
+ }
+ throw new IllegalArgumentException("only the key (0) and value (1) fields expected: " + fieldIndex);
+ }
+
+ @Override
+ public void start() {
+ key = null;
+ value = null;
+ }
+
+ @Override
+ public void end() {
+ map.put(key, value);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java
new file mode 100644
index 0000000..42bfb09
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+/**
+ * A Hadoop {@link org.apache.hadoop.mapreduce.InputFormat} for Parquet files.
+ */
+public class AvroParquetInputFormat<T> extends ParquetInputFormat<T> {
+ public AvroParquetInputFormat() {
+ super(AvroReadSupport.class);
+ }
+
+ /**
+ * Set the subset of columns to read (projection pushdown). Specified as an Avro
+ * schema, the requested projection is converted into a Parquet schema for Parquet
+ * column projection.
+ * <p>
+ * This is useful if the full schema is large and you only want to read a few
+ * columns, since it saves time by not reading unused columns.
+ * <p>
+ * If a requested projection is set, then the Avro schema used for reading
+ * must be compatible with the projection. For instance, if a column is not included
+ * in the projection then it must either not be included or be optional in the read
+ * schema. Use {@link #setAvroReadSchema(org.apache.hadoop.mapreduce.Job,
+ * org.apache.avro.Schema)} to set a read schema, if needed.
+ * @param job
+ * @param requestedProjection
+ * @see #setAvroReadSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+ * @see org.apache.parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+ */
+ public static void setRequestedProjection(Job job, Schema requestedProjection) {
+ AvroReadSupport.setRequestedProjection(ContextUtil.getConfiguration(job),
+ requestedProjection);
+ }
+
+ /**
+ * Override the Avro schema to use for reading. If not set, the Avro schema used for
+ * writing is used.
+ * <p>
+ * Differences between the read and write schemas are resolved using
+ * <a href="http://avro.apache.org/docs/current/spec.html#Schema+Resolution">Avro's schema resolution rules</a>.
+ * @param job
+ * @param avroReadSchema
+ * @see #setRequestedProjection(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+ * @see org.apache.parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+ */
+ public static void setAvroReadSchema(Job job, Schema avroReadSchema) {
+ AvroReadSupport.setAvroReadSchema(ContextUtil.getConfiguration(job), avroReadSchema);
+ }
+
+ /**
+ * Uses an instance of the specified {@link AvroDataSupplier} class to control how the
+ * {@link org.apache.avro.specific.SpecificData} instance that is used to find
+ * Avro specific records is created.
+ * @param job
+ * @param supplierClass
+ */
+ public static void setAvroDataSupplier(Job job,
+ Class<? extends AvroDataSupplier> supplierClass) {
+ AvroReadSupport.setAvroDataSupplier(ContextUtil.getConfiguration(job), supplierClass);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
new file mode 100644
index 0000000..afca74f
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.parquet.avro.AvroWriteSupport;
+import org.apache.parquet.hadoop.ParquetOutputFormat;
+import org.apache.parquet.hadoop.util.ContextUtil;
+
+/**
+ * A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Parquet files.
+ */
+public class AvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord> {
+
+ /**
+ * Set the Avro schema to use for writing. The schema is translated into a Parquet
+ * schema so that the records can be written in Parquet format. It is also
+ * stored in the Parquet metadata so that records can be reconstructed as Avro
+ * objects at read time without specifying a read schema.
+ * @param job
+ * @param schema
+ * @see org.apache.parquet.avro.AvroParquetInputFormat#setAvroReadSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+ */
+ public static void setSchema(Job job, Schema schema) {
+ AvroWriteSupport.setSchema(ContextUtil.getConfiguration(job), schema);
+ }
+
+ public AvroParquetOutputFormat() {
+ super(new AvroWriteSupport());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
new file mode 100644
index 0000000..40cf5eb
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.parquet.filter.UnboundRecordFilter;
+import org.apache.parquet.hadoop.ParquetReader;
+
+/**
+ * Read Avro records from a Parquet file.
+ */
+public class AvroParquetReader<T extends IndexedRecord> extends ParquetReader<T> {
+
+ public static <T extends IndexedRecord> Builder<T> builder(Path file) {
+ return ParquetReader.builder(new AvroReadSupport<T>(), file);
+ }
+
+ /**
+ * @deprecated use {@link #builder(Path)}
+ */
+ @Deprecated
+ public AvroParquetReader(Path file) throws IOException {
+ super(file, new AvroReadSupport<T>());
+ }
+
+ /**
+ * @deprecated use {@link #builder(Path)}
+ */
+ @Deprecated
+ public AvroParquetReader(Path file, UnboundRecordFilter unboundRecordFilter) throws IOException {
+ super(file, new AvroReadSupport<T>(), unboundRecordFilter);
+ }
+
+ /**
+ * @deprecated use {@link #builder(Path)}
+ */
+ @Deprecated
+ public AvroParquetReader(Configuration conf, Path file) throws IOException {
+ super(conf, file, new AvroReadSupport<T>());
+ }
+
+ /**
+ * @deprecated use {@link #builder(Path)}
+ */
+ @Deprecated
+ public AvroParquetReader(Configuration conf, Path file, UnboundRecordFilter unboundRecordFilter) throws IOException {
+ super(conf, file, new AvroReadSupport<T>(), unboundRecordFilter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
new file mode 100644
index 0000000..afa2c6d
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.io.IOException;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+/**
+ * Write Avro records to a Parquet file.
+ */
+public class AvroParquetWriter<T extends IndexedRecord> extends ParquetWriter<T> {
+
+ /** Create a new {@link AvroParquetWriter}.
+ *
+ * @param file
+ * @param avroSchema
+ * @param compressionCodecName
+ * @param blockSize
+ * @param pageSize
+ * @throws IOException
+ */
+ public AvroParquetWriter(Path file, Schema avroSchema,
+ CompressionCodecName compressionCodecName, int blockSize,
+ int pageSize) throws IOException {
+ super(file, AvroParquetWriter.<T>writeSupport(avroSchema),
+ compressionCodecName, blockSize, pageSize);
+ }
+
+ /** Create a new {@link AvroParquetWriter}.
+ *
+ * @param file The file name to write to.
+ * @param avroSchema The schema to write with.
+ * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED
+ * @param blockSize the block size threshold.
+ * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes.
+ * @param enableDictionary Whether to use a dictionary to compress columns.
+ * @throws IOException
+ */
+ public AvroParquetWriter(Path file, Schema avroSchema,
+ CompressionCodecName compressionCodecName, int blockSize,
+ int pageSize, boolean enableDictionary) throws IOException {
+ super(file, AvroParquetWriter.<T>writeSupport(avroSchema),
+ compressionCodecName, blockSize, pageSize, enableDictionary,
+ DEFAULT_IS_VALIDATING_ENABLED);
+ }
+
+ /** Create a new {@link AvroParquetWriter}. The default block size is 50 MB.The default
+ * page size is 1 MB. Default compression is no compression. (Inherited from {@link ParquetWriter})
+ *
+ * @param file The file name to write to.
+ * @param avroSchema The schema to write with.
+ * @throws IOException
+ */
+ public AvroParquetWriter(Path file, Schema avroSchema) throws IOException {
+ this(file, avroSchema, CompressionCodecName.UNCOMPRESSED,
+ DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+ }
+
+ /** Create a new {@link AvroParquetWriter}.
+ *
+ * @param file The file name to write to.
+ * @param avroSchema The schema to write with.
+ * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED
+ * @param blockSize the block size threshold.
+ * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes.
+ * @param enableDictionary Whether to use a dictionary to compress columns.
+ * @param conf The Configuration to use.
+ * @throws IOException
+ */
+ public AvroParquetWriter(Path file, Schema avroSchema,
+ CompressionCodecName compressionCodecName,
+ int blockSize, int pageSize, boolean enableDictionary,
+ Configuration conf) throws IOException {
+ super(file, AvroParquetWriter.<T>writeSupport(avroSchema),
+ compressionCodecName, blockSize, pageSize, pageSize, enableDictionary,
+ DEFAULT_IS_VALIDATING_ENABLED, DEFAULT_WRITER_VERSION, conf);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <T> WriteSupport<T> writeSupport(Schema avroSchema) {
+ return (WriteSupport<T>) new AvroWriteSupport(
+ new AvroSchemaConverter().convert(avroSchema), avroSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
new file mode 100644
index 0000000..9f1ba46
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.parquet.hadoop.api.ReadSupport;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Avro implementation of {@link ReadSupport} for Avro {@link IndexedRecord}s which cover both Avro Specific and
+ * Generic. Users should use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using
+ * this class directly.
+ */
+public class AvroReadSupport<T extends IndexedRecord> extends ReadSupport<T> {
+
+ public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection";
+ private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
+
+ static final String AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema";
+ // older files were written with the schema in this metadata key
+ static final String OLD_AVRO_SCHEMA_METADATA_KEY = "avro.schema";
+ private static final String AVRO_READ_SCHEMA_METADATA_KEY = "avro.read.schema";
+
+ public static String AVRO_DATA_SUPPLIER = "parquet.avro.data.supplier";
+
+ /**
+ * @see org.apache.parquet.avro.AvroParquetInputFormat#setRequestedProjection(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+ */
+ public static void setRequestedProjection(Configuration configuration, Schema requestedProjection) {
+ configuration.set(AVRO_REQUESTED_PROJECTION, requestedProjection.toString());
+ }
+
+ /**
+ * @see org.apache.parquet.avro.AvroParquetInputFormat#setAvroReadSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+ */
+ public static void setAvroReadSchema(Configuration configuration, Schema avroReadSchema) {
+ configuration.set(AVRO_READ_SCHEMA, avroReadSchema.toString());
+ }
+
+ public static void setAvroDataSupplier(Configuration configuration,
+ Class<? extends AvroDataSupplier> clazz) {
+ configuration.set(AVRO_DATA_SUPPLIER, clazz.getName());
+ }
+
+ @Override
+ public ReadContext init(Configuration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema) {
+ MessageType projection = fileSchema;
+ Map<String, String> metadata = null;
+
+ String requestedProjectionString = configuration.get(AVRO_REQUESTED_PROJECTION);
+ if (requestedProjectionString != null) {
+ Schema avroRequestedProjection = new Schema.Parser().parse(requestedProjectionString);
+ projection = new AvroSchemaConverter(configuration).convert(avroRequestedProjection);
+ }
+ String avroReadSchema = configuration.get(AVRO_READ_SCHEMA);
+ if (avroReadSchema != null) {
+ metadata = new LinkedHashMap<String, String>();
+ metadata.put(AVRO_READ_SCHEMA_METADATA_KEY, avroReadSchema);
+ }
+ return new ReadContext(projection, metadata);
+ }
+
+ @Override
+ public RecordMaterializer<T> prepareForRead(
+ Configuration configuration, Map<String, String> keyValueMetaData,
+ MessageType fileSchema, ReadContext readContext) {
+ MessageType parquetSchema = readContext.getRequestedSchema();
+ Schema avroSchema;
+ if (readContext.getReadSupportMetadata() != null &&
+ readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) {
+ // use the Avro read schema provided by the user
+ avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY));
+ } else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) {
+ // use the Avro schema from the file metadata if present
+ avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY));
+ } else if (keyValueMetaData.get(OLD_AVRO_SCHEMA_METADATA_KEY) != null) {
+ // use the Avro schema from the file metadata if present
+ avroSchema = new Schema.Parser().parse(keyValueMetaData.get(OLD_AVRO_SCHEMA_METADATA_KEY));
+ } else {
+ // default to converting the Parquet schema into an Avro schema
+ avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema);
+ }
+ Class<? extends AvroDataSupplier> suppClass = configuration.getClass(AVRO_DATA_SUPPLIER,
+ SpecificDataSupplier.class,
+ AvroDataSupplier.class);
+ AvroDataSupplier supplier =ReflectionUtils.newInstance(suppClass, configuration);
+ return new AvroRecordMaterializer<T>(parquetSchema, avroSchema, supplier.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
new file mode 100644
index 0000000..1794929
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.io.api.GroupConverter;
+import org.apache.parquet.io.api.RecordMaterializer;
+import org.apache.parquet.schema.MessageType;
+
+class AvroRecordMaterializer<T extends IndexedRecord> extends RecordMaterializer<T> {
+
+ private AvroIndexedRecordConverter<T> root;
+
+ public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema,
+ GenericData baseModel) {
+ this.root = new AvroIndexedRecordConverter<T>(requestedSchema, avroSchema, baseModel);
+ }
+
+ @Override
+ public T getCurrentRecord() {
+ return root.getCurrentRecord();
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return root;
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
new file mode 100644
index 0000000..19aa2b3
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.util.*;
+
+import org.apache.avro.Schema;
+
+import org.apache.hadoop.conf.Configuration;
+import org.codehaus.jackson.node.NullNode;
+import org.apache.parquet.schema.ConversionPatterns;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import static org.apache.parquet.schema.OriginalType.*;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
+
+/**
+ * <p>
+ * Converts an Avro schema into a Parquet schema. See package documentation for details
+ * of the mapping.
+ * </p>
+ */
+public class AvroSchemaConverter {
+
+ static final String ADD_LIST_ELEMENT_RECORDS =
+ "parquet.avro.add-list-element-records";
+ private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true;
+
+ private final boolean assumeRepeatedIsListElement;
+
+ public AvroSchemaConverter() {
+ this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT;
+ }
+
+ public AvroSchemaConverter(Configuration conf) {
+ this.assumeRepeatedIsListElement = conf.getBoolean(
+ ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT);
+ }
+
+ /**
+ * Given a schema, check to see if it is a union of a null type and a regular schema,
+ * and then return the non-null sub-schema. Otherwise, return the given schema.
+ *
+ * @param schema The schema to check
+ * @return The non-null portion of a union schema, or the given schema
+ */
+ public static Schema getNonNull(Schema schema) {
+ if (schema.getType().equals(Schema.Type.UNION)) {
+ List<Schema> schemas = schema.getTypes();
+ if (schemas.size() == 2) {
+ if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
+ return schemas.get(1);
+ } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
+ return schemas.get(0);
+ } else {
+ return schema;
+ }
+ } else {
+ return schema;
+ }
+ } else {
+ return schema;
+ }
+ }
+
+ public MessageType convert(Schema avroSchema) {
+ if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
+ throw new IllegalArgumentException("Avro schema must be a record.");
+ }
+ return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields()));
+ }
+
+ private List<Type> convertFields(List<Schema.Field> fields) {
+ List<Type> types = new ArrayList<Type>();
+ for (Schema.Field field : fields) {
+ if (field.schema().getType().equals(Schema.Type.NULL)) {
+ continue; // Avro nulls are not encoded, unless they are null unions
+ }
+ types.add(convertField(field));
+ }
+ return types;
+ }
+
+ private Type convertField(String fieldName, Schema schema) {
+ return convertField(fieldName, schema, Type.Repetition.REQUIRED);
+ }
+
+ private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+ Schema.Type type = schema.getType();
+ if (type.equals(Schema.Type.BOOLEAN)) {
+ return primitive(fieldName, BOOLEAN, repetition);
+ } else if (type.equals(Schema.Type.INT)) {
+ return primitive(fieldName, INT32, repetition);
+ } else if (type.equals(Schema.Type.LONG)) {
+ return primitive(fieldName, INT64, repetition);
+ } else if (type.equals(Schema.Type.FLOAT)) {
+ return primitive(fieldName, FLOAT, repetition);
+ } else if (type.equals(Schema.Type.DOUBLE)) {
+ return primitive(fieldName, DOUBLE, repetition);
+ } else if (type.equals(Schema.Type.BYTES)) {
+ return primitive(fieldName, BINARY, repetition);
+ } else if (type.equals(Schema.Type.STRING)) {
+ return primitive(fieldName, BINARY, repetition, UTF8);
+ } else if (type.equals(Schema.Type.RECORD)) {
+ return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
+ } else if (type.equals(Schema.Type.ENUM)) {
+ return primitive(fieldName, BINARY, repetition, ENUM);
+ } else if (type.equals(Schema.Type.ARRAY)) {
+ return ConversionPatterns.listType(repetition, fieldName,
+ convertField("array", schema.getElementType(), Type.Repetition.REPEATED));
+ } else if (type.equals(Schema.Type.MAP)) {
+ Type valType = convertField("value", schema.getValueType());
+ // avro map key type is always string
+ return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
+ } else if (type.equals(Schema.Type.FIXED)) {
+ return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition,
+ schema.getFixedSize(), null);
+ } else if (type.equals(Schema.Type.UNION)) {
+ return convertUnion(fieldName, schema, repetition);
+ }
+ throw new UnsupportedOperationException("Cannot convert Avro type " + type);
+ }
+
+ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
+ List<Schema> nonNullSchemas = new ArrayList(schema.getTypes().size());
+ for (Schema childSchema : schema.getTypes()) {
+ if (childSchema.getType().equals(Schema.Type.NULL)) {
+ if (Type.Repetition.REQUIRED == repetition) {
+ repetition = Type.Repetition.OPTIONAL;
+ }
+ } else {
+ nonNullSchemas.add(childSchema);
+ }
+ }
+ // If we only get a null and one other type then its a simple optional field
+ // otherwise construct a union container
+ switch (nonNullSchemas.size()) {
+ case 0:
+ throw new UnsupportedOperationException("Cannot convert Avro union of only nulls");
+
+ case 1:
+ return convertField(fieldName, nonNullSchemas.get(0), repetition);
+
+ default: // complex union type
+ List<Type> unionTypes = new ArrayList(nonNullSchemas.size());
+ int index = 0;
+ for (Schema childSchema : nonNullSchemas) {
+ unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL));
+ }
+ return new GroupType(repetition, fieldName, unionTypes);
+ }
+ }
+
+ private Type convertField(Schema.Field field) {
+ return convertField(field.name(), field.schema());
+ }
+
+ private PrimitiveType primitive(String name,
+ PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
+ int typeLength, OriginalType originalType) {
+ return new PrimitiveType(repetition, primitive, typeLength, name,
+ originalType);
+ }
+
+ private PrimitiveType primitive(String name,
+ PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
+ OriginalType originalType) {
+ return new PrimitiveType(repetition, primitive, name, originalType);
+ }
+
+ private PrimitiveType primitive(String name,
+ PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) {
+ return new PrimitiveType(repetition, primitive, name, null);
+ }
+
+ public Schema convert(MessageType parquetSchema) {
+ return convertFields(parquetSchema.getName(), parquetSchema.getFields());
+ }
+
+ private Schema convertFields(String name, List<Type> parquetFields) {
+ List<Schema.Field> fields = new ArrayList<Schema.Field>();
+ for (Type parquetType : parquetFields) {
+ Schema fieldSchema = convertField(parquetType);
+ if (parquetType.isRepetition(Type.Repetition.REPEATED)) {
+ throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType);
+ } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) {
+ fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null,
+ NullNode.getInstance()));
+ } else { // REQUIRED
+ fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, null));
+ }
+ }
+ Schema schema = Schema.createRecord(name, null, null, false);
+ schema.setFields(fields);
+ return schema;
+ }
+
+ private Schema convertField(final Type parquetType) {
+ if (parquetType.isPrimitive()) {
+ final PrimitiveTypeName parquetPrimitiveTypeName =
+ parquetType.asPrimitiveType().getPrimitiveTypeName();
+ final OriginalType originalType = parquetType.getOriginalType();
+ return parquetPrimitiveTypeName.convert(
+ new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
+ @Override
+ public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
+ return Schema.create(Schema.Type.BOOLEAN);
+ }
+ @Override
+ public Schema convertINT32(PrimitiveTypeName primitiveTypeName) {
+ return Schema.create(Schema.Type.INT);
+ }
+ @Override
+ public Schema convertINT64(PrimitiveTypeName primitiveTypeName) {
+ return Schema.create(Schema.Type.LONG);
+ }
+ @Override
+ public Schema convertINT96(PrimitiveTypeName primitiveTypeName) {
+ throw new IllegalArgumentException("INT64 not yet implemented.");
+ }
+ @Override
+ public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) {
+ return Schema.create(Schema.Type.FLOAT);
+ }
+ @Override
+ public Schema convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
+ return Schema.create(Schema.Type.DOUBLE);
+ }
+ @Override
+ public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) {
+ int size = parquetType.asPrimitiveType().getTypeLength();
+ return Schema.createFixed(parquetType.getName(), null, null, size);
+ }
+ @Override
+ public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) {
+ if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) {
+ return Schema.create(Schema.Type.STRING);
+ } else {
+ return Schema.create(Schema.Type.BYTES);
+ }
+ }
+ });
+ } else {
+ GroupType parquetGroupType = parquetType.asGroupType();
+ OriginalType originalType = parquetGroupType.getOriginalType();
+ if (originalType != null) {
+ switch(originalType) {
+ case LIST:
+ if (parquetGroupType.getFieldCount()!= 1) {
+ throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
+ }
+ Type repeatedType = parquetGroupType.getType(0);
+ if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
+ throw new UnsupportedOperationException("Invalid list type " + parquetGroupType);
+ }
+ if (isElementType(repeatedType, parquetGroupType.getName())) {
+ // repeated element types are always required
+ return Schema.createArray(convertField(repeatedType));
+ } else {
+ Type elementType = repeatedType.asGroupType().getType(0);
+ if (elementType.isRepetition(Type.Repetition.OPTIONAL)) {
+ return Schema.createArray(optional(convertField(elementType)));
+ } else {
+ return Schema.createArray(convertField(elementType));
+ }
+ }
+ case MAP_KEY_VALUE: // for backward-compatibility
+ case MAP:
+ if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
+ throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
+ }
+ GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
+ if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) ||
+ !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE) ||
+ mapKeyValType.getFieldCount()!=2) {
+ throw new UnsupportedOperationException("Invalid map type " + parquetGroupType);
+ }
+ Type keyType = mapKeyValType.getType(0);
+ if (!keyType.isPrimitive() ||
+ !keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveTypeName.BINARY) ||
+ !keyType.getOriginalType().equals(OriginalType.UTF8)) {
+ throw new IllegalArgumentException("Map key type must be binary (UTF8): "
+ + keyType);
+ }
+ Type valueType = mapKeyValType.getType(1);
+ if (valueType.isRepetition(Type.Repetition.OPTIONAL)) {
+ return Schema.createMap(optional(convertField(valueType)));
+ } else {
+ return Schema.createMap(convertField(valueType));
+ }
+ case ENUM:
+ return Schema.create(Schema.Type.STRING);
+ case UTF8:
+ default:
+ throw new UnsupportedOperationException("Cannot convert Parquet type " +
+ parquetType);
+
+ }
+ } else {
+ // if no original type then it's a record
+ return convertFields(parquetGroupType.getName(), parquetGroupType.getFields());
+ }
+ }
+ }
+
+ /**
+ * Implements the rules for interpreting existing data from the logical type
+ * spec for the LIST annotation. This is used to produce the expected schema.
+ * <p>
+ * The AvroArrayConverter will decide whether the repeated type is the array
+ * element type by testing whether the element schema and repeated type are
+ * the same. This ensures that the LIST rules are followed when there is no
+ * schema and that a schema can be provided to override the default behavior.
+ */
+ private boolean isElementType(Type repeatedType, String parentName) {
+ return (
+ // can't be a synthetic layer because it would be invalid
+ repeatedType.isPrimitive() ||
+ repeatedType.asGroupType().getFieldCount() > 1 ||
+ // known patterns without the synthetic layer
+ repeatedType.getName().equals("array") ||
+ repeatedType.getName().equals(parentName + "_tuple") ||
+ // default assumption
+ assumeRepeatedIsListElement
+ );
+ }
+
+ private static Schema optional(Schema original) {
+ // null is first in the union because Parquet's default is always null
+ return Schema.createUnion(Arrays.asList(
+ Schema.create(Schema.Type.NULL),
+ original));
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
new file mode 100644
index 0000000..2ec8ee1
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+/**
+ * Avro implementation of {@link WriteSupport} for {@link IndexedRecord}s - both Avro Generic and Specific.
+ * Users should use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using
+ * this class directly.
+ */
+public class AvroWriteSupport extends WriteSupport<IndexedRecord> {
+
+ static final String AVRO_SCHEMA = "parquet.avro.schema";
+ private static final Schema MAP_KEY_SCHEMA = Schema.create(Schema.Type.STRING);
+
+ private RecordConsumer recordConsumer;
+ private MessageType rootSchema;
+ private Schema rootAvroSchema;
+
+ public AvroWriteSupport() {
+ }
+
+ public AvroWriteSupport(MessageType schema, Schema avroSchema) {
+ this.rootSchema = schema;
+ this.rootAvroSchema = avroSchema;
+ }
+
+ /**
+ * @see org.apache.parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema)
+ */
+ public static void setSchema(Configuration configuration, Schema schema) {
+ configuration.set(AVRO_SCHEMA, schema.toString());
+ }
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+ if (rootAvroSchema == null) {
+ rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA));
+ rootSchema = new AvroSchemaConverter().convert(rootAvroSchema);
+ }
+ Map<String, String> extraMetaData = new HashMap<String, String>();
+ extraMetaData.put(AvroReadSupport.AVRO_SCHEMA_METADATA_KEY, rootAvroSchema.toString());
+ return new WriteContext(rootSchema, extraMetaData);
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ @Override
+ public void write(IndexedRecord record) {
+ recordConsumer.startMessage();
+ writeRecordFields(rootSchema, rootAvroSchema, record);
+ recordConsumer.endMessage();
+ }
+
+ private void writeRecord(GroupType schema, Schema avroSchema,
+ IndexedRecord record) {
+ recordConsumer.startGroup();
+ writeRecordFields(schema, avroSchema, record);
+ recordConsumer.endGroup();
+ }
+
+ private void writeRecordFields(GroupType schema, Schema avroSchema,
+ IndexedRecord record) {
+ List<Type> fields = schema.getFields();
+ List<Schema.Field> avroFields = avroSchema.getFields();
+ int index = 0; // parquet ignores Avro nulls, so index may differ
+ for (int avroIndex = 0; avroIndex < avroFields.size(); avroIndex++) {
+ Schema.Field avroField = avroFields.get(avroIndex);
+ if (avroField.schema().getType().equals(Schema.Type.NULL)) {
+ continue;
+ }
+ Type fieldType = fields.get(index);
+ Object value = record.get(avroIndex);
+ if (value != null) {
+ recordConsumer.startField(fieldType.getName(), index);
+ writeValue(fieldType, avroField.schema(), value);
+ recordConsumer.endField(fieldType.getName(), index);
+ } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+ throw new RuntimeException("Null-value for required field: " + avroField.name());
+ }
+ index++;
+ }
+ }
+
+ private <T> void writeArray(GroupType schema, Schema avroSchema,
+ Collection<T> array) {
+ recordConsumer.startGroup(); // group wrapper (original type LIST)
+ if (array.size() > 0) {
+ recordConsumer.startField("array", 0);
+ for (T elt : array) {
+ writeValue(schema.getType(0), avroSchema.getElementType(), elt);
+ }
+ recordConsumer.endField("array", 0);
+ }
+ recordConsumer.endGroup();
+ }
+
+ private <V> void writeMap(GroupType schema, Schema avroSchema,
+ Map<CharSequence, V> map) {
+ GroupType innerGroup = schema.getType(0).asGroupType();
+ Type keyType = innerGroup.getType(0);
+ Type valueType = innerGroup.getType(1);
+
+ recordConsumer.startGroup(); // group wrapper (original type MAP)
+ if (map.size() > 0) {
+ recordConsumer.startField("map", 0);
+
+ for (Map.Entry<CharSequence, V> entry : map.entrySet()) {
+ recordConsumer.startGroup(); // repeated group key_value, middle layer
+ recordConsumer.startField("key", 0);
+ writeValue(keyType, MAP_KEY_SCHEMA, entry.getKey());
+ recordConsumer.endField("key", 0);
+ V value = entry.getValue();
+ if (value != null) {
+ recordConsumer.startField("value", 1);
+ writeValue(valueType, avroSchema.getValueType(), value);
+ recordConsumer.endField("value", 1);
+ } else if (!valueType.isRepetition(Type.Repetition.OPTIONAL)) {
+ throw new RuntimeException("Null map value for " + avroSchema.getName());
+ }
+ recordConsumer.endGroup();
+ }
+
+ recordConsumer.endField("map", 0);
+ }
+ recordConsumer.endGroup();
+ }
+
+ private void writeUnion(GroupType parquetSchema, Schema avroSchema,
+ Object value) {
+ recordConsumer.startGroup();
+
+ // ResolveUnion will tell us which of the union member types to
+ // deserialise.
+ int avroIndex = GenericData.get().resolveUnion(avroSchema, value);
+
+ // For parquet's schema we skip nulls
+ GroupType parquetGroup = parquetSchema.asGroupType();
+ int parquetIndex = avroIndex;
+ for (int i = 0; i < avroIndex; i++) {
+ if (avroSchema.getTypes().get(i).getType().equals(Schema.Type.NULL)) {
+ parquetIndex--;
+ }
+ }
+
+ // Sparsely populated method of encoding unions, each member has its own
+ // set of columns.
+ String memberName = "member" + parquetIndex;
+ recordConsumer.startField(memberName, parquetIndex);
+ writeValue(parquetGroup.getType(parquetIndex),
+ avroSchema.getTypes().get(avroIndex), value);
+ recordConsumer.endField(memberName, parquetIndex);
+
+ recordConsumer.endGroup();
+ }
+
+ @SuppressWarnings("unchecked")
+ private void writeValue(Type type, Schema avroSchema, Object value) {
+ Schema nonNullAvroSchema = AvroSchemaConverter.getNonNull(avroSchema);
+ Schema.Type avroType = nonNullAvroSchema.getType();
+ if (avroType.equals(Schema.Type.BOOLEAN)) {
+ recordConsumer.addBoolean((Boolean) value);
+ } else if (avroType.equals(Schema.Type.INT)) {
+ recordConsumer.addInteger(((Number) value).intValue());
+ } else if (avroType.equals(Schema.Type.LONG)) {
+ recordConsumer.addLong(((Number) value).longValue());
+ } else if (avroType.equals(Schema.Type.FLOAT)) {
+ recordConsumer.addFloat(((Number) value).floatValue());
+ } else if (avroType.equals(Schema.Type.DOUBLE)) {
+ recordConsumer.addDouble(((Number) value).doubleValue());
+ } else if (avroType.equals(Schema.Type.BYTES)) {
+ recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) value));
+ } else if (avroType.equals(Schema.Type.STRING)) {
+ recordConsumer.addBinary(fromAvroString(value));
+ } else if (avroType.equals(Schema.Type.RECORD)) {
+ writeRecord((GroupType) type, nonNullAvroSchema, (IndexedRecord) value);
+ } else if (avroType.equals(Schema.Type.ENUM)) {
+ recordConsumer.addBinary(Binary.fromString(value.toString()));
+ } else if (avroType.equals(Schema.Type.ARRAY)) {
+ writeArray((GroupType) type, nonNullAvroSchema, (Collection<?>) value);
+ } else if (avroType.equals(Schema.Type.MAP)) {
+ writeMap((GroupType) type, nonNullAvroSchema, (Map<CharSequence, ?>) value);
+ } else if (avroType.equals(Schema.Type.UNION)) {
+ writeUnion((GroupType) type, nonNullAvroSchema, value);
+ } else if (avroType.equals(Schema.Type.FIXED)) {
+ recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes()));
+ }
+ }
+
+ private Binary fromAvroString(Object value) {
+ if (value instanceof Utf8) {
+ Utf8 utf8 = (Utf8) value;
+ return Binary.fromByteArray(utf8.getBytes(), 0, utf8.getByteLength());
+ }
+ return Binary.fromString(value.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java b/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java
new file mode 100644
index 0000000..2bd2599
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+
+class SpecificDataSupplier implements AvroDataSupplier {
+ @Override
+ public GenericData get() {
+ return SpecificData.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java b/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java
new file mode 100644
index 0000000..e5e0475
--- /dev/null
+++ b/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ *
+ * <p>
+ * Provides classes to store Avro data in Parquet files. Avro schemas are converted to
+ * parquet schemas as follows. Only record schemas are converted,
+ * other top-level schema types are not converted and attempting to do so will result
+ * in an error. Avro types are converted to Parquet types using the mapping shown here:
+ * </p>
+ *
+ * <table>
+ * <tr>
+ * <th>Avro type</th>
+ * <th>Parquet type</th>
+ * </tr>
+ * <tr>
+ * <td>null</td>
+ * <td>no type (the field is not encoded in Parquet), unless a null union</td>
+ * </tr>
+ * <tr>
+ * <td>boolean</td>
+ * <td>boolean</td>
+ * </tr>
+ * <tr>
+ * <td>int</td>
+ * <td>int32</td>
+ * </tr>
+ * <tr>
+ * <td>long</td>
+ * <td>int64</td>
+ * </tr>
+ * <tr>
+ * <td>float</td>
+ * <td>float</td>
+ * </tr>
+ * <tr>
+ * <td>double</td>
+ * <td>double</td>
+ * </tr>
+ * <tr>
+ * <td>bytes</td>
+ * <td>binary</td>
+ * </tr>
+ * <tr>
+ * <td>string</td>
+ * <td>binary (with original type UTF8)</td>
+ * </tr>
+ * <tr>
+ * <td>record</td>
+ * <td>group containing nested fields</td>
+ * </tr>
+ * <tr>
+ * <td>enum</td>
+ * <td>binary (with original type ENUM)</td>
+ * </tr>
+ * <tr>
+ * <td>array</td>
+ * <td>group (with original type LIST) containing one repeated group field</td>
+ * </tr>
+ * <tr>
+ * <td>map</td>
+ * <td>group (with original type MAP) containing one repeated group
+ * field (with original type MAP_KEY_VALUE) of (key, value)</td>
+ * </tr>
+ * <tr>
+ * <td>fixed</td>
+ * <td>fixed_len_byte_array</td>
+ * </tr>
+ * <tr>
+ * <td>union</td>
+ * <td>an optional type, in the case of a null union, otherwise not supported</td>
+ * </tr>
+ * </table>
+ *
+ * <p>
+ * For Parquet files that were not written with classes from this package there is no
+ * Avro write schema stored in the Parquet file metadata. To read such files using
+ * classes from this package you must either provide an Avro read schema,
+ * or a default Avro schema will be derived using the following mapping.
+ * </p>
+ *
+ * <tr>
+ * <th>Parquet type</th>
+ * <th>Avro type</th>
+ * </tr>
+ * <tr>
+ * <td>boolean</td>
+ * <td>boolean</td>
+ * </tr>
+ * <tr>
+ * <td>int32</td>
+ * <td>int</td>
+ * </tr>
+ * <tr>
+ * <td>int64</td>
+ * <td>long</td>
+ * </tr>
+ * <tr>
+ * <td>int96</td>
+ * <td>not supported</td>
+ * </tr>
+ * <tr>
+ * <td>float</td>
+ * <td>float</td>
+ * </tr>
+ * <tr>
+ * <td>double</td>
+ * <td>double</td>
+ * </tr>
+ * <tr>
+ * <td>fixed_len_byte_array</td>
+ * <td>fixed</td>
+ * </tr>
+ * <tr>
+ * <td>binary (with no original type)</td>
+ * <td>bytes</td>
+ * </tr>
+ * <tr>
+ * <td>binary (with original type UTF8)</td>
+ * <td>string</td>
+ * </tr>
+ * <tr>
+ * <td>binary (with original type ENUM)</td>
+ * <td>string</td>
+ * </tr>
+ * <tr>
+ * <td>group (with original type LIST) containing one repeated group field</td>
+ * <td>array</td>
+ * </tr>
+ * <tr>
+ * <td>group (with original type MAP) containing one repeated group
+ * field (with original type MAP_KEY_VALUE) of (key, value)</td>
+ * <td>map</td>
+ * </tr>
+ * </table>
+ *
+ * <p>
+ * Parquet fields that are optional are mapped to an Avro null union.
+ * </p>
+ *
+ * <p>
+ * Some conversions are lossy. Avro nulls are not represented in Parquet,
+ * so they are lost when converted back to Avro. Similarly, a Parquet enum does not
+ * store its values, so it cannot be converted back to an Avro enum,
+ * which is why an Avro string had to suffice. Type names for nested records, enums,
+ * and fixed types are lost in the conversion to Parquet.
+ * Avro aliases, default values, field ordering, and documentation strings are all
+ * dropped in the conversion to Parquet.
+ *
+ * Parquet maps can have any type for keys, but this is not true in Avro where map keys
+ * are assumed to be strings.
+ * </p>
+ */
+package org.apache.parquet.avro;
http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java
----------------------------------------------------------------------
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java b/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java
deleted file mode 100644
index c6c04ab..0000000
--- a/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package parquet.avro;
-
-import org.apache.avro.generic.GenericData;
-
-/**
- * Allows clients to control how the classes associated
- * with specific Avro records are managed and found, e.g.,
- * by creating an instance of {@code GenericData} that
- * uses a particular {@code ClassLoader}.
- */
-public interface AvroDataSupplier {
- GenericData get();
-}
|