Return-Path: X-Original-To: apmail-parquet-commits-archive@minotaur.apache.org Delivered-To: apmail-parquet-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E373176CE for ; Mon, 27 Apr 2015 23:12:01 +0000 (UTC) Received: (qmail 49137 invoked by uid 500); 27 Apr 2015 23:12:01 -0000 Delivered-To: apmail-parquet-commits-archive@parquet.apache.org Received: (qmail 49084 invoked by uid 500); 27 Apr 2015 23:12:01 -0000 Mailing-List: contact commits-help@parquet.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.incubator.apache.org Delivered-To: mailing list commits@parquet.incubator.apache.org Received: (qmail 48044 invoked by uid 99); 27 Apr 2015 23:11:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Apr 2015 23:11:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 36C3FE30F2; Mon, 27 Apr 2015 23:11:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blue@apache.org To: commits@parquet.apache.org Date: Mon, 27 Apr 2015 23:12:47 -0000 Message-Id: <68059a8321b64d69b99bddac8c37e3f0@git.apache.org> In-Reply-To: <190ba0de36204468a8a5a0e9d143ae5d@git.apache.org> References: <190ba0de36204468a8a5a0e9d143ae5d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [50/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet. http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java new file mode 100644 index 0000000..f76f367 --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java @@ -0,0 +1,700 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericArray; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.specific.SpecificData; +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.io.InvalidRecordException; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.Type; + +class AvroIndexedRecordConverter extends GroupConverter { + + private final ParentValueContainer parent; + protected T currentRecord; + private final Converter[] converters; + + private final Schema avroSchema; + private final Class specificClass; + + private final GenericData model; + private final Map recordDefaults = new HashMap(); + + public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) { + this(parquetSchema, avroSchema, SpecificData.get()); + } + + public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema, + GenericData baseModel) { + this(null, parquetSchema, avroSchema, baseModel); + } + + public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType + parquetSchema, Schema avroSchema) { + this(parent, parquetSchema, avroSchema, SpecificData.get()); + } + + public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType + parquetSchema, Schema avroSchema, GenericData baseModel) { + this.parent = parent; + this.avroSchema = avroSchema; + int schemaSize = parquetSchema.getFieldCount(); + this.converters = new Converter[schemaSize]; + this.specificClass = baseModel instanceof SpecificData ? + ((SpecificData) baseModel).getClass(avroSchema) : null; + + this.model = this.specificClass == null ? GenericData.get() : baseModel; + + Map avroFieldIndexes = new HashMap(); + int avroFieldIndex = 0; + for (Schema.Field field: avroSchema.getFields()) { + avroFieldIndexes.put(field.name(), avroFieldIndex++); + } + int parquetFieldIndex = 0; + for (Type parquetField: parquetSchema.getFields()) { + Schema.Field avroField = getAvroField(parquetField.getName()); + Schema nonNullSchema = AvroSchemaConverter.getNonNull(avroField.schema()); + final int finalAvroIndex = avroFieldIndexes.remove(avroField.name()); + converters[parquetFieldIndex++] = newConverter(nonNullSchema, parquetField, model, new ParentValueContainer() { + @Override + void add(Object value) { + AvroIndexedRecordConverter.this.set(finalAvroIndex, value); + } + }); + } + // store defaults for any new Avro fields from avroSchema that are not in the writer schema (parquetSchema) + for (String fieldName : avroFieldIndexes.keySet()) { + Schema.Field field = avroSchema.getField(fieldName); + if (field.schema().getType() == Schema.Type.NULL) { + continue; // skip null since Parquet does not write nulls + } + if (field.defaultValue() == null || model.getDefaultValue(field) == null) { + continue; // field has no default + } + recordDefaults.put(field, model.getDefaultValue(field)); + } + } + + private Schema.Field getAvroField(String parquetFieldName) { + Schema.Field avroField = avroSchema.getField(parquetFieldName); + for (Schema.Field f : avroSchema.getFields()) { + if (f.aliases().contains(parquetFieldName)) { + return f; + } + } + if (avroField == null) { + throw new InvalidRecordException(String.format("Parquet/Avro schema mismatch. Avro field '%s' not found.", + parquetFieldName)); + } + return avroField; + } + + private static Converter newConverter(Schema schema, Type type, + GenericData model, ParentValueContainer parent) { + if (schema.getType().equals(Schema.Type.BOOLEAN)) { + return new FieldBooleanConverter(parent); + } else if (schema.getType().equals(Schema.Type.INT)) { + return new FieldIntegerConverter(parent); + } else if (schema.getType().equals(Schema.Type.LONG)) { + return new FieldLongConverter(parent); + } else if (schema.getType().equals(Schema.Type.FLOAT)) { + return new FieldFloatConverter(parent); + } else if (schema.getType().equals(Schema.Type.DOUBLE)) { + return new FieldDoubleConverter(parent); + } else if (schema.getType().equals(Schema.Type.BYTES)) { + return new FieldBytesConverter(parent); + } else if (schema.getType().equals(Schema.Type.STRING)) { + return new FieldStringConverter(parent, type.getOriginalType() == OriginalType.UTF8); + } else if (schema.getType().equals(Schema.Type.RECORD)) { + return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema, model); + } else if (schema.getType().equals(Schema.Type.ENUM)) { + return new FieldEnumConverter(parent, schema, model); + } else if (schema.getType().equals(Schema.Type.ARRAY)) { + return new AvroArrayConverter(parent, type.asGroupType(), schema, model); + } else if (schema.getType().equals(Schema.Type.MAP)) { + return new MapConverter(parent, type.asGroupType(), schema, model); + } else if (schema.getType().equals(Schema.Type.UNION)) { + return new AvroUnionConverter(parent, type, schema, model); + } else if (schema.getType().equals(Schema.Type.FIXED)) { + return new FieldFixedConverter(parent, schema, model); + } + throw new UnsupportedOperationException(String.format("Cannot convert Avro type: %s" + + " (Parquet type: %s) ", schema, type)); + } + + private void set(int index, Object value) { + this.currentRecord.put(index, value); + } + + @Override + public Converter getConverter(int fieldIndex) { + return converters[fieldIndex]; + } + + @Override + public void start() { + // Should do the right thing whether it is generic or specific + this.currentRecord = (T) ((this.specificClass == null) ? + new GenericData.Record(avroSchema) : + ((SpecificData) model).newInstance(specificClass, avroSchema)); + } + + @Override + public void end() { + fillInDefaults(); + if (parent != null) { + parent.add(currentRecord); + } + } + + private void fillInDefaults() { + for (Map.Entry entry : recordDefaults.entrySet()) { + Schema.Field f = entry.getKey(); + // replace following with model.deepCopy once AVRO-1455 is being used + Object defaultValue = deepCopy(f.schema(), entry.getValue()); + this.currentRecord.put(f.pos(), defaultValue); + } + } + + private Object deepCopy(Schema schema, Object value) { + switch (schema.getType()) { + case BOOLEAN: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + return value; + default: + return model.deepCopy(schema, value); + } + } + + T getCurrentRecord() { + return currentRecord; + } + + static abstract class ParentValueContainer { + + /** + * Adds the value to the parent. + */ + abstract void add(Object value); + + } + + static final class FieldBooleanConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + + public FieldBooleanConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBoolean(boolean value) { + parent.add(value); + } + + } + + static final class FieldIntegerConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + + public FieldIntegerConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(value); + } + + } + + static final class FieldLongConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + + public FieldLongConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(Long.valueOf(value)); + } + + @Override + final public void addLong(long value) { + parent.add(value); + } + + } + + static final class FieldFloatConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + + public FieldFloatConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(Float.valueOf(value)); + } + + @Override + final public void addLong(long value) { + parent.add(Float.valueOf(value)); + } + + @Override + final public void addFloat(float value) { + parent.add(value); + } + + } + + static final class FieldDoubleConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + + public FieldDoubleConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addInt(int value) { + parent.add(Double.valueOf(value)); + } + + @Override + final public void addLong(long value) { + parent.add(Double.valueOf(value)); + } + + @Override + final public void addFloat(float value) { + parent.add(Double.valueOf(value)); + } + + @Override + final public void addDouble(double value) { + parent.add(value); + } + + } + + static final class FieldBytesConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + + public FieldBytesConverter(ParentValueContainer parent) { + this.parent = parent; + } + + @Override + final public void addBinary(Binary value) { + parent.add(ByteBuffer.wrap(value.getBytes())); + } + + } + + static final class FieldStringConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + private final boolean dictionarySupport; + private String[] dict; + + public FieldStringConverter(ParentValueContainer parent, boolean dictionarySupport) { + this.parent = parent; + this.dictionarySupport = dictionarySupport; + } + + @Override + final public void addBinary(Binary value) { + parent.add(value.toStringUsingUTF8()); + } + + @Override + public boolean hasDictionarySupport() { + return dictionarySupport; + } + + @Override + public void setDictionary(Dictionary dictionary) { + dict = new String[dictionary.getMaxId() + 1]; + for (int i = 0; i <= dictionary.getMaxId(); i++) { + dict[i] = dictionary.decodeToBinary(i).toStringUsingUTF8(); + } + } + + @Override + public void addValueFromDictionary(int dictionaryId) { + parent.add(dict[dictionaryId]); + } + } + + static final class FieldEnumConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + private final Class enumClass; + + public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema, + GenericData model) { + this.parent = parent; + this.enumClass = model instanceof SpecificData ? + ((SpecificData) model).getClass(enumSchema) : + SpecificData.get().getClass(enumSchema); + } + + @Override + final public void addBinary(Binary value) { + Object enumValue = value.toStringUsingUTF8(); + if (enumClass != null) { + enumValue = (Enum.valueOf(enumClass,(String)enumValue)); + } + parent.add(enumValue); + } + } + + static final class FieldFixedConverter extends PrimitiveConverter { + + private final ParentValueContainer parent; + private final Schema avroSchema; + private final Class fixedClass; + private final Constructor fixedClassCtor; + + public FieldFixedConverter(ParentValueContainer parent, Schema avroSchema, + GenericData model) { + this.parent = parent; + this.avroSchema = avroSchema; + this.fixedClass = model instanceof SpecificData ? + ((SpecificData) model).getClass(avroSchema) : + SpecificData.get().getClass(avroSchema); + if (fixedClass != null) { + try { + this.fixedClassCtor = + fixedClass.getConstructor(new Class[] { byte[].class }); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + this.fixedClassCtor = null; + } + } + + @Override + final public void addBinary(Binary value) { + if (fixedClass == null) { + parent.add(new GenericData.Fixed(avroSchema, value.getBytes())); + } else { + if (fixedClassCtor == null) { + throw new IllegalArgumentException( + "fixedClass specified but fixedClassCtor is null."); + } + try { + Object fixed = fixedClassCtor.newInstance(value.getBytes()); + parent.add(fixed); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + } + + /** + * Converter for a list. + * + *
+   *   optional group the_list (LIST) { <-- this layer
+   *     repeated group array {
+   *       optional (type) element;
+   *     }
+   *   }
+   * 
+ * + * This class also implements LIST element backward-compatibility rules. + * + * @param The type of elements in the list + */ + static final class AvroArrayConverter extends GroupConverter { + + private final ParentValueContainer parent; + private final Schema avroSchema; + private final Converter converter; + private GenericArray array; + + public AvroArrayConverter(ParentValueContainer parent, GroupType type, + Schema avroSchema, GenericData model) { + this.parent = parent; + this.avroSchema = avroSchema; + Schema elementSchema = this.avroSchema.getElementType(); + Type repeatedType = type.getType(0); + // always determine whether the repeated type is the element type by + // matching it against the element schema. + if (isElementType(repeatedType, elementSchema)) { + // the element type is the repeated type (and required) + converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + void add(Object value) { + array.add((T) value); + } + }); + } else { + // the element is wrapped in a synthetic group and may be optional + converter = new ElementConverter(repeatedType.asGroupType(), elementSchema, model); + } + } + + @Override + public Converter getConverter(int fieldIndex) { + return converter; + } + + @Override + public void start() { + array = new GenericData.Array(0, avroSchema); + } + + @Override + public void end() { + parent.add(array); + } + + /** + * Returns whether the given type is the element type of a list or is a + * synthetic group with one field that is the element type. This is + * determined by checking whether the type can be a synthetic group and by + * checking whether a potential synthetic group matches the expected schema. + *

+ * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this + * method never guesses because the expected schema is known. + * + * @param repeatedType a type that may be the element type + * @param elementSchema the expected Schema for list elements + * @return {@code true} if the repeatedType is the element schema + */ + static boolean isElementType(Type repeatedType, Schema elementSchema) { + if (repeatedType.isPrimitive() || + repeatedType.asGroupType().getFieldCount() > 1) { + // The repeated type must be the element type because it is an invalid + // synthetic wrapper (must be a group with one field). + return true; + } else if (elementSchema != null && + elementSchema.getType() == Schema.Type.RECORD && + elementSchema.getFields().size() == 1 && + elementSchema.getFields().get(0).name().equals( + repeatedType.asGroupType().getFieldName(0))) { + // The repeated type must be the element type because it matches the + // structure of the Avro element's schema. + return true; + } + return false; + } + + /** + * Converter for list elements. + * + *

+     *   optional group the_list (LIST) {
+     *     repeated group array { <-- this layer
+     *       optional (type) element;
+     *     }
+     *   }
+     * 
+ */ + final class ElementConverter extends GroupConverter { + private T element; + private final Converter elementConverter; + + public ElementConverter(GroupType repeatedType, Schema elementSchema, GenericData model) { + Type elementType = repeatedType.getType(0); + Schema nonNullElementSchema = AvroSchemaConverter.getNonNull(elementSchema); + this.elementConverter = newConverter(nonNullElementSchema, elementType, model, new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + void add(Object value) { + ElementConverter.this.element = (T) value; + } + }); + } + + @Override + public Converter getConverter(int fieldIndex) { + Preconditions.checkArgument( + fieldIndex == 0, "Illegal field index: " + fieldIndex); + return elementConverter; + } + + @Override + public void start() { + element = null; + } + + @Override + public void end() { + array.add(element); + } + } + } + + static final class AvroUnionConverter extends GroupConverter { + + private final ParentValueContainer parent; + private final Converter[] memberConverters; + private Object memberValue = null; + + public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema, + Schema avroSchema, GenericData model) { + this.parent = parent; + GroupType parquetGroup = parquetSchema.asGroupType(); + this.memberConverters = new Converter[ parquetGroup.getFieldCount()]; + + int parquetIndex = 0; + for (int index = 0; index < avroSchema.getTypes().size(); index++) { + Schema memberSchema = avroSchema.getTypes().get(index); + if (!memberSchema.getType().equals(Schema.Type.NULL)) { + Type memberType = parquetGroup.getType(parquetIndex); + memberConverters[parquetIndex] = newConverter(memberSchema, memberType, model, new ParentValueContainer() { + @Override + void add(Object value) { + Preconditions.checkArgument(memberValue==null, "Union is resolving to more than one type"); + memberValue = value; + } + }); + parquetIndex++; // Note for nulls the parquetIndex id not increased + } + } + } + + @Override + public Converter getConverter(int fieldIndex) { + return memberConverters[fieldIndex]; + } + + @Override + public void start() { + memberValue = null; + } + + @Override + public void end() { + parent.add(memberValue); + } + } + + static final class MapConverter extends GroupConverter { + + private final ParentValueContainer parent; + private final Converter keyValueConverter; + private Map map; + + public MapConverter(ParentValueContainer parent, GroupType mapType, + Schema mapSchema, GenericData model) { + this.parent = parent; + GroupType repeatedKeyValueType = mapType.getType(0).asGroupType(); + this.keyValueConverter = new MapKeyValueConverter( + repeatedKeyValueType, mapSchema, model); + } + + @Override + public Converter getConverter(int fieldIndex) { + return keyValueConverter; + } + + @Override + public void start() { + this.map = new HashMap(); + } + + @Override + public void end() { + parent.add(map); + } + + final class MapKeyValueConverter extends GroupConverter { + + private String key; + private V value; + private final Converter keyConverter; + private final Converter valueConverter; + + public MapKeyValueConverter(GroupType keyValueType, Schema mapSchema, + GenericData model) { + keyConverter = new PrimitiveConverter() { + @Override + final public void addBinary(Binary value) { + key = value.toStringUsingUTF8(); + } + }; + + Type valueType = keyValueType.getType(1); + Schema nonNullValueSchema = AvroSchemaConverter.getNonNull(mapSchema.getValueType()); + valueConverter = newConverter(nonNullValueSchema, valueType, model, new ParentValueContainer() { + @Override + @SuppressWarnings("unchecked") + void add(Object value) { + MapKeyValueConverter.this.value = (V) value; + } + }); + } + + @Override + public Converter getConverter(int fieldIndex) { + if (fieldIndex == 0) { + return keyConverter; + } else if (fieldIndex == 1) { + return valueConverter; + } + throw new IllegalArgumentException("only the key (0) and value (1) fields expected: " + fieldIndex); + } + + @Override + public void start() { + key = null; + value = null; + } + + @Override + public void end() { + map.put(key, value); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java new file mode 100644 index 0000000..42bfb09 --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetInputFormat.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.util.ContextUtil; + +/** + * A Hadoop {@link org.apache.hadoop.mapreduce.InputFormat} for Parquet files. + */ +public class AvroParquetInputFormat extends ParquetInputFormat { + public AvroParquetInputFormat() { + super(AvroReadSupport.class); + } + + /** + * Set the subset of columns to read (projection pushdown). Specified as an Avro + * schema, the requested projection is converted into a Parquet schema for Parquet + * column projection. + *

+ * This is useful if the full schema is large and you only want to read a few + * columns, since it saves time by not reading unused columns. + *

+ * If a requested projection is set, then the Avro schema used for reading + * must be compatible with the projection. For instance, if a column is not included + * in the projection then it must either not be included or be optional in the read + * schema. Use {@link #setAvroReadSchema(org.apache.hadoop.mapreduce.Job, + * org.apache.avro.Schema)} to set a read schema, if needed. + * @param job + * @param requestedProjection + * @see #setAvroReadSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema) + * @see org.apache.parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema) + */ + public static void setRequestedProjection(Job job, Schema requestedProjection) { + AvroReadSupport.setRequestedProjection(ContextUtil.getConfiguration(job), + requestedProjection); + } + + /** + * Override the Avro schema to use for reading. If not set, the Avro schema used for + * writing is used. + *

+ * Differences between the read and write schemas are resolved using + * Avro's schema resolution rules. + * @param job + * @param avroReadSchema + * @see #setRequestedProjection(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema) + * @see org.apache.parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema) + */ + public static void setAvroReadSchema(Job job, Schema avroReadSchema) { + AvroReadSupport.setAvroReadSchema(ContextUtil.getConfiguration(job), avroReadSchema); + } + + /** + * Uses an instance of the specified {@link AvroDataSupplier} class to control how the + * {@link org.apache.avro.specific.SpecificData} instance that is used to find + * Avro specific records is created. + * @param job + * @param supplierClass + */ + public static void setAvroDataSupplier(Job job, + Class supplierClass) { + AvroReadSupport.setAvroDataSupplier(ContextUtil.getConfiguration(job), supplierClass); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java new file mode 100644 index 0000000..afca74f --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetOutputFormat.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.avro.AvroWriteSupport; +import org.apache.parquet.hadoop.ParquetOutputFormat; +import org.apache.parquet.hadoop.util.ContextUtil; + +/** + * A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Parquet files. + */ +public class AvroParquetOutputFormat extends ParquetOutputFormat { + + /** + * Set the Avro schema to use for writing. The schema is translated into a Parquet + * schema so that the records can be written in Parquet format. It is also + * stored in the Parquet metadata so that records can be reconstructed as Avro + * objects at read time without specifying a read schema. + * @param job + * @param schema + * @see org.apache.parquet.avro.AvroParquetInputFormat#setAvroReadSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema) + */ + public static void setSchema(Job job, Schema schema) { + AvroWriteSupport.setSchema(ContextUtil.getConfiguration(job), schema); + } + + public AvroParquetOutputFormat() { + super(new AvroWriteSupport()); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java new file mode 100644 index 0000000..40cf5eb --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetReader.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import java.io.IOException; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import org.apache.parquet.filter.UnboundRecordFilter; +import org.apache.parquet.hadoop.ParquetReader; + +/** + * Read Avro records from a Parquet file. + */ +public class AvroParquetReader extends ParquetReader { + + public static Builder builder(Path file) { + return ParquetReader.builder(new AvroReadSupport(), file); + } + + /** + * @deprecated use {@link #builder(Path)} + */ + @Deprecated + public AvroParquetReader(Path file) throws IOException { + super(file, new AvroReadSupport()); + } + + /** + * @deprecated use {@link #builder(Path)} + */ + @Deprecated + public AvroParquetReader(Path file, UnboundRecordFilter unboundRecordFilter) throws IOException { + super(file, new AvroReadSupport(), unboundRecordFilter); + } + + /** + * @deprecated use {@link #builder(Path)} + */ + @Deprecated + public AvroParquetReader(Configuration conf, Path file) throws IOException { + super(conf, file, new AvroReadSupport()); + } + + /** + * @deprecated use {@link #builder(Path)} + */ + @Deprecated + public AvroParquetReader(Configuration conf, Path file, UnboundRecordFilter unboundRecordFilter) throws IOException { + super(conf, file, new AvroReadSupport(), unboundRecordFilter); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java new file mode 100644 index 0000000..afa2c6d --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import java.io.IOException; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +/** + * Write Avro records to a Parquet file. + */ +public class AvroParquetWriter extends ParquetWriter { + + /** Create a new {@link AvroParquetWriter}. + * + * @param file + * @param avroSchema + * @param compressionCodecName + * @param blockSize + * @param pageSize + * @throws IOException + */ + public AvroParquetWriter(Path file, Schema avroSchema, + CompressionCodecName compressionCodecName, int blockSize, + int pageSize) throws IOException { + super(file, AvroParquetWriter.writeSupport(avroSchema), + compressionCodecName, blockSize, pageSize); + } + + /** Create a new {@link AvroParquetWriter}. + * + * @param file The file name to write to. + * @param avroSchema The schema to write with. + * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED + * @param blockSize the block size threshold. + * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes. + * @param enableDictionary Whether to use a dictionary to compress columns. + * @throws IOException + */ + public AvroParquetWriter(Path file, Schema avroSchema, + CompressionCodecName compressionCodecName, int blockSize, + int pageSize, boolean enableDictionary) throws IOException { + super(file, AvroParquetWriter.writeSupport(avroSchema), + compressionCodecName, blockSize, pageSize, enableDictionary, + DEFAULT_IS_VALIDATING_ENABLED); + } + + /** Create a new {@link AvroParquetWriter}. The default block size is 50 MB.The default + * page size is 1 MB. Default compression is no compression. (Inherited from {@link ParquetWriter}) + * + * @param file The file name to write to. + * @param avroSchema The schema to write with. + * @throws IOException + */ + public AvroParquetWriter(Path file, Schema avroSchema) throws IOException { + this(file, avroSchema, CompressionCodecName.UNCOMPRESSED, + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); + } + + /** Create a new {@link AvroParquetWriter}. + * + * @param file The file name to write to. + * @param avroSchema The schema to write with. + * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED + * @param blockSize the block size threshold. + * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes. + * @param enableDictionary Whether to use a dictionary to compress columns. + * @param conf The Configuration to use. + * @throws IOException + */ + public AvroParquetWriter(Path file, Schema avroSchema, + CompressionCodecName compressionCodecName, + int blockSize, int pageSize, boolean enableDictionary, + Configuration conf) throws IOException { + super(file, AvroParquetWriter.writeSupport(avroSchema), + compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, + DEFAULT_IS_VALIDATING_ENABLED, DEFAULT_WRITER_VERSION, conf); + } + + @SuppressWarnings("unchecked") + private static WriteSupport writeSupport(Schema avroSchema) { + return (WriteSupport) new AvroWriteSupport( + new AvroSchemaConverter().convert(avroSchema), avroSchema); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java new file mode 100644 index 0000000..9f1ba46 --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; + +/** + * Avro implementation of {@link ReadSupport} for Avro {@link IndexedRecord}s which cover both Avro Specific and + * Generic. Users should use {@link AvroParquetReader} or {@link AvroParquetInputFormat} rather than using + * this class directly. + */ +public class AvroReadSupport extends ReadSupport { + + public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection"; + private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema"; + + static final String AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema"; + // older files were written with the schema in this metadata key + static final String OLD_AVRO_SCHEMA_METADATA_KEY = "avro.schema"; + private static final String AVRO_READ_SCHEMA_METADATA_KEY = "avro.read.schema"; + + public static String AVRO_DATA_SUPPLIER = "parquet.avro.data.supplier"; + + /** + * @see org.apache.parquet.avro.AvroParquetInputFormat#setRequestedProjection(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema) + */ + public static void setRequestedProjection(Configuration configuration, Schema requestedProjection) { + configuration.set(AVRO_REQUESTED_PROJECTION, requestedProjection.toString()); + } + + /** + * @see org.apache.parquet.avro.AvroParquetInputFormat#setAvroReadSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema) + */ + public static void setAvroReadSchema(Configuration configuration, Schema avroReadSchema) { + configuration.set(AVRO_READ_SCHEMA, avroReadSchema.toString()); + } + + public static void setAvroDataSupplier(Configuration configuration, + Class clazz) { + configuration.set(AVRO_DATA_SUPPLIER, clazz.getName()); + } + + @Override + public ReadContext init(Configuration configuration, + Map keyValueMetaData, + MessageType fileSchema) { + MessageType projection = fileSchema; + Map metadata = null; + + String requestedProjectionString = configuration.get(AVRO_REQUESTED_PROJECTION); + if (requestedProjectionString != null) { + Schema avroRequestedProjection = new Schema.Parser().parse(requestedProjectionString); + projection = new AvroSchemaConverter(configuration).convert(avroRequestedProjection); + } + String avroReadSchema = configuration.get(AVRO_READ_SCHEMA); + if (avroReadSchema != null) { + metadata = new LinkedHashMap(); + metadata.put(AVRO_READ_SCHEMA_METADATA_KEY, avroReadSchema); + } + return new ReadContext(projection, metadata); + } + + @Override + public RecordMaterializer prepareForRead( + Configuration configuration, Map keyValueMetaData, + MessageType fileSchema, ReadContext readContext) { + MessageType parquetSchema = readContext.getRequestedSchema(); + Schema avroSchema; + if (readContext.getReadSupportMetadata() != null && + readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY) != null) { + // use the Avro read schema provided by the user + avroSchema = new Schema.Parser().parse(readContext.getReadSupportMetadata().get(AVRO_READ_SCHEMA_METADATA_KEY)); + } else if (keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY) != null) { + // use the Avro schema from the file metadata if present + avroSchema = new Schema.Parser().parse(keyValueMetaData.get(AVRO_SCHEMA_METADATA_KEY)); + } else if (keyValueMetaData.get(OLD_AVRO_SCHEMA_METADATA_KEY) != null) { + // use the Avro schema from the file metadata if present + avroSchema = new Schema.Parser().parse(keyValueMetaData.get(OLD_AVRO_SCHEMA_METADATA_KEY)); + } else { + // default to converting the Parquet schema into an Avro schema + avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema); + } + Class suppClass = configuration.getClass(AVRO_DATA_SUPPLIER, + SpecificDataSupplier.class, + AvroDataSupplier.class); + AvroDataSupplier supplier =ReflectionUtils.newInstance(suppClass, configuration); + return new AvroRecordMaterializer(parquetSchema, avroSchema, supplier.get()); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java new file mode 100644 index 0000000..1794929 --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordMaterializer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.IndexedRecord; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; + +class AvroRecordMaterializer extends RecordMaterializer { + + private AvroIndexedRecordConverter root; + + public AvroRecordMaterializer(MessageType requestedSchema, Schema avroSchema, + GenericData baseModel) { + this.root = new AvroIndexedRecordConverter(requestedSchema, avroSchema, baseModel); + } + + @Override + public T getCurrentRecord() { + return root.getCurrentRecord(); + } + + @Override + public GroupConverter getRootConverter() { + return root; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java new file mode 100644 index 0000000..19aa2b3 --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import java.util.*; + +import org.apache.avro.Schema; + +import org.apache.hadoop.conf.Configuration; +import org.codehaus.jackson.node.NullNode; +import org.apache.parquet.schema.ConversionPatterns; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; + +import static org.apache.parquet.schema.OriginalType.*; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; + +/** + *

+ * Converts an Avro schema into a Parquet schema. See package documentation for details + * of the mapping. + *

+ */ +public class AvroSchemaConverter { + + static final String ADD_LIST_ELEMENT_RECORDS = + "parquet.avro.add-list-element-records"; + private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true; + + private final boolean assumeRepeatedIsListElement; + + public AvroSchemaConverter() { + this.assumeRepeatedIsListElement = ADD_LIST_ELEMENT_RECORDS_DEFAULT; + } + + public AvroSchemaConverter(Configuration conf) { + this.assumeRepeatedIsListElement = conf.getBoolean( + ADD_LIST_ELEMENT_RECORDS, ADD_LIST_ELEMENT_RECORDS_DEFAULT); + } + + /** + * Given a schema, check to see if it is a union of a null type and a regular schema, + * and then return the non-null sub-schema. Otherwise, return the given schema. + * + * @param schema The schema to check + * @return The non-null portion of a union schema, or the given schema + */ + public static Schema getNonNull(Schema schema) { + if (schema.getType().equals(Schema.Type.UNION)) { + List schemas = schema.getTypes(); + if (schemas.size() == 2) { + if (schemas.get(0).getType().equals(Schema.Type.NULL)) { + return schemas.get(1); + } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { + return schemas.get(0); + } else { + return schema; + } + } else { + return schema; + } + } else { + return schema; + } + } + + public MessageType convert(Schema avroSchema) { + if (!avroSchema.getType().equals(Schema.Type.RECORD)) { + throw new IllegalArgumentException("Avro schema must be a record."); + } + return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields())); + } + + private List convertFields(List fields) { + List types = new ArrayList(); + for (Schema.Field field : fields) { + if (field.schema().getType().equals(Schema.Type.NULL)) { + continue; // Avro nulls are not encoded, unless they are null unions + } + types.add(convertField(field)); + } + return types; + } + + private Type convertField(String fieldName, Schema schema) { + return convertField(fieldName, schema, Type.Repetition.REQUIRED); + } + + private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) { + Schema.Type type = schema.getType(); + if (type.equals(Schema.Type.BOOLEAN)) { + return primitive(fieldName, BOOLEAN, repetition); + } else if (type.equals(Schema.Type.INT)) { + return primitive(fieldName, INT32, repetition); + } else if (type.equals(Schema.Type.LONG)) { + return primitive(fieldName, INT64, repetition); + } else if (type.equals(Schema.Type.FLOAT)) { + return primitive(fieldName, FLOAT, repetition); + } else if (type.equals(Schema.Type.DOUBLE)) { + return primitive(fieldName, DOUBLE, repetition); + } else if (type.equals(Schema.Type.BYTES)) { + return primitive(fieldName, BINARY, repetition); + } else if (type.equals(Schema.Type.STRING)) { + return primitive(fieldName, BINARY, repetition, UTF8); + } else if (type.equals(Schema.Type.RECORD)) { + return new GroupType(repetition, fieldName, convertFields(schema.getFields())); + } else if (type.equals(Schema.Type.ENUM)) { + return primitive(fieldName, BINARY, repetition, ENUM); + } else if (type.equals(Schema.Type.ARRAY)) { + return ConversionPatterns.listType(repetition, fieldName, + convertField("array", schema.getElementType(), Type.Repetition.REPEATED)); + } else if (type.equals(Schema.Type.MAP)) { + Type valType = convertField("value", schema.getValueType()); + // avro map key type is always string + return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType); + } else if (type.equals(Schema.Type.FIXED)) { + return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition, + schema.getFixedSize(), null); + } else if (type.equals(Schema.Type.UNION)) { + return convertUnion(fieldName, schema, repetition); + } + throw new UnsupportedOperationException("Cannot convert Avro type " + type); + } + + private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) { + List nonNullSchemas = new ArrayList(schema.getTypes().size()); + for (Schema childSchema : schema.getTypes()) { + if (childSchema.getType().equals(Schema.Type.NULL)) { + if (Type.Repetition.REQUIRED == repetition) { + repetition = Type.Repetition.OPTIONAL; + } + } else { + nonNullSchemas.add(childSchema); + } + } + // If we only get a null and one other type then its a simple optional field + // otherwise construct a union container + switch (nonNullSchemas.size()) { + case 0: + throw new UnsupportedOperationException("Cannot convert Avro union of only nulls"); + + case 1: + return convertField(fieldName, nonNullSchemas.get(0), repetition); + + default: // complex union type + List unionTypes = new ArrayList(nonNullSchemas.size()); + int index = 0; + for (Schema childSchema : nonNullSchemas) { + unionTypes.add( convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL)); + } + return new GroupType(repetition, fieldName, unionTypes); + } + } + + private Type convertField(Schema.Field field) { + return convertField(field.name(), field.schema()); + } + + private PrimitiveType primitive(String name, + PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition, + int typeLength, OriginalType originalType) { + return new PrimitiveType(repetition, primitive, typeLength, name, + originalType); + } + + private PrimitiveType primitive(String name, + PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition, + OriginalType originalType) { + return new PrimitiveType(repetition, primitive, name, originalType); + } + + private PrimitiveType primitive(String name, + PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) { + return new PrimitiveType(repetition, primitive, name, null); + } + + public Schema convert(MessageType parquetSchema) { + return convertFields(parquetSchema.getName(), parquetSchema.getFields()); + } + + private Schema convertFields(String name, List parquetFields) { + List fields = new ArrayList(); + for (Type parquetType : parquetFields) { + Schema fieldSchema = convertField(parquetType); + if (parquetType.isRepetition(Type.Repetition.REPEATED)) { + throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType); + } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) { + fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null, + NullNode.getInstance())); + } else { // REQUIRED + fields.add(new Schema.Field(parquetType.getName(), fieldSchema, null, null)); + } + } + Schema schema = Schema.createRecord(name, null, null, false); + schema.setFields(fields); + return schema; + } + + private Schema convertField(final Type parquetType) { + if (parquetType.isPrimitive()) { + final PrimitiveTypeName parquetPrimitiveTypeName = + parquetType.asPrimitiveType().getPrimitiveTypeName(); + final OriginalType originalType = parquetType.getOriginalType(); + return parquetPrimitiveTypeName.convert( + new PrimitiveType.PrimitiveTypeNameConverter() { + @Override + public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { + return Schema.create(Schema.Type.BOOLEAN); + } + @Override + public Schema convertINT32(PrimitiveTypeName primitiveTypeName) { + return Schema.create(Schema.Type.INT); + } + @Override + public Schema convertINT64(PrimitiveTypeName primitiveTypeName) { + return Schema.create(Schema.Type.LONG); + } + @Override + public Schema convertINT96(PrimitiveTypeName primitiveTypeName) { + throw new IllegalArgumentException("INT64 not yet implemented."); + } + @Override + public Schema convertFLOAT(PrimitiveTypeName primitiveTypeName) { + return Schema.create(Schema.Type.FLOAT); + } + @Override + public Schema convertDOUBLE(PrimitiveTypeName primitiveTypeName) { + return Schema.create(Schema.Type.DOUBLE); + } + @Override + public Schema convertFIXED_LEN_BYTE_ARRAY(PrimitiveTypeName primitiveTypeName) { + int size = parquetType.asPrimitiveType().getTypeLength(); + return Schema.createFixed(parquetType.getName(), null, null, size); + } + @Override + public Schema convertBINARY(PrimitiveTypeName primitiveTypeName) { + if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) { + return Schema.create(Schema.Type.STRING); + } else { + return Schema.create(Schema.Type.BYTES); + } + } + }); + } else { + GroupType parquetGroupType = parquetType.asGroupType(); + OriginalType originalType = parquetGroupType.getOriginalType(); + if (originalType != null) { + switch(originalType) { + case LIST: + if (parquetGroupType.getFieldCount()!= 1) { + throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); + } + Type repeatedType = parquetGroupType.getType(0); + if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) { + throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); + } + if (isElementType(repeatedType, parquetGroupType.getName())) { + // repeated element types are always required + return Schema.createArray(convertField(repeatedType)); + } else { + Type elementType = repeatedType.asGroupType().getType(0); + if (elementType.isRepetition(Type.Repetition.OPTIONAL)) { + return Schema.createArray(optional(convertField(elementType))); + } else { + return Schema.createArray(convertField(elementType)); + } + } + case MAP_KEY_VALUE: // for backward-compatibility + case MAP: + if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) { + throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); + } + GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType(); + if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) || + !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE) || + mapKeyValType.getFieldCount()!=2) { + throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); + } + Type keyType = mapKeyValType.getType(0); + if (!keyType.isPrimitive() || + !keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveTypeName.BINARY) || + !keyType.getOriginalType().equals(OriginalType.UTF8)) { + throw new IllegalArgumentException("Map key type must be binary (UTF8): " + + keyType); + } + Type valueType = mapKeyValType.getType(1); + if (valueType.isRepetition(Type.Repetition.OPTIONAL)) { + return Schema.createMap(optional(convertField(valueType))); + } else { + return Schema.createMap(convertField(valueType)); + } + case ENUM: + return Schema.create(Schema.Type.STRING); + case UTF8: + default: + throw new UnsupportedOperationException("Cannot convert Parquet type " + + parquetType); + + } + } else { + // if no original type then it's a record + return convertFields(parquetGroupType.getName(), parquetGroupType.getFields()); + } + } + } + + /** + * Implements the rules for interpreting existing data from the logical type + * spec for the LIST annotation. This is used to produce the expected schema. + *

+ * The AvroArrayConverter will decide whether the repeated type is the array + * element type by testing whether the element schema and repeated type are + * the same. This ensures that the LIST rules are followed when there is no + * schema and that a schema can be provided to override the default behavior. + */ + private boolean isElementType(Type repeatedType, String parentName) { + return ( + // can't be a synthetic layer because it would be invalid + repeatedType.isPrimitive() || + repeatedType.asGroupType().getFieldCount() > 1 || + // known patterns without the synthetic layer + repeatedType.getName().equals("array") || + repeatedType.getName().equals(parentName + "_tuple") || + // default assumption + assumeRepeatedIsListElement + ); + } + + private static Schema optional(Schema original) { + // null is first in the union because Parquet's default is always null + return Schema.createUnion(Arrays.asList( + Schema.create(Schema.Type.NULL), + original)); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java new file mode 100644 index 0000000..2ec8ee1 --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.IndexedRecord; +import org.apache.avro.util.Utf8; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; + +/** + * Avro implementation of {@link WriteSupport} for {@link IndexedRecord}s - both Avro Generic and Specific. + * Users should use {@link AvroParquetWriter} or {@link AvroParquetOutputFormat} rather than using + * this class directly. + */ +public class AvroWriteSupport extends WriteSupport { + + static final String AVRO_SCHEMA = "parquet.avro.schema"; + private static final Schema MAP_KEY_SCHEMA = Schema.create(Schema.Type.STRING); + + private RecordConsumer recordConsumer; + private MessageType rootSchema; + private Schema rootAvroSchema; + + public AvroWriteSupport() { + } + + public AvroWriteSupport(MessageType schema, Schema avroSchema) { + this.rootSchema = schema; + this.rootAvroSchema = avroSchema; + } + + /** + * @see org.apache.parquet.avro.AvroParquetOutputFormat#setSchema(org.apache.hadoop.mapreduce.Job, org.apache.avro.Schema) + */ + public static void setSchema(Configuration configuration, Schema schema) { + configuration.set(AVRO_SCHEMA, schema.toString()); + } + + @Override + public WriteContext init(Configuration configuration) { + if (rootAvroSchema == null) { + rootAvroSchema = new Schema.Parser().parse(configuration.get(AVRO_SCHEMA)); + rootSchema = new AvroSchemaConverter().convert(rootAvroSchema); + } + Map extraMetaData = new HashMap(); + extraMetaData.put(AvroReadSupport.AVRO_SCHEMA_METADATA_KEY, rootAvroSchema.toString()); + return new WriteContext(rootSchema, extraMetaData); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(IndexedRecord record) { + recordConsumer.startMessage(); + writeRecordFields(rootSchema, rootAvroSchema, record); + recordConsumer.endMessage(); + } + + private void writeRecord(GroupType schema, Schema avroSchema, + IndexedRecord record) { + recordConsumer.startGroup(); + writeRecordFields(schema, avroSchema, record); + recordConsumer.endGroup(); + } + + private void writeRecordFields(GroupType schema, Schema avroSchema, + IndexedRecord record) { + List fields = schema.getFields(); + List avroFields = avroSchema.getFields(); + int index = 0; // parquet ignores Avro nulls, so index may differ + for (int avroIndex = 0; avroIndex < avroFields.size(); avroIndex++) { + Schema.Field avroField = avroFields.get(avroIndex); + if (avroField.schema().getType().equals(Schema.Type.NULL)) { + continue; + } + Type fieldType = fields.get(index); + Object value = record.get(avroIndex); + if (value != null) { + recordConsumer.startField(fieldType.getName(), index); + writeValue(fieldType, avroField.schema(), value); + recordConsumer.endField(fieldType.getName(), index); + } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) { + throw new RuntimeException("Null-value for required field: " + avroField.name()); + } + index++; + } + } + + private void writeArray(GroupType schema, Schema avroSchema, + Collection array) { + recordConsumer.startGroup(); // group wrapper (original type LIST) + if (array.size() > 0) { + recordConsumer.startField("array", 0); + for (T elt : array) { + writeValue(schema.getType(0), avroSchema.getElementType(), elt); + } + recordConsumer.endField("array", 0); + } + recordConsumer.endGroup(); + } + + private void writeMap(GroupType schema, Schema avroSchema, + Map map) { + GroupType innerGroup = schema.getType(0).asGroupType(); + Type keyType = innerGroup.getType(0); + Type valueType = innerGroup.getType(1); + + recordConsumer.startGroup(); // group wrapper (original type MAP) + if (map.size() > 0) { + recordConsumer.startField("map", 0); + + for (Map.Entry entry : map.entrySet()) { + recordConsumer.startGroup(); // repeated group key_value, middle layer + recordConsumer.startField("key", 0); + writeValue(keyType, MAP_KEY_SCHEMA, entry.getKey()); + recordConsumer.endField("key", 0); + V value = entry.getValue(); + if (value != null) { + recordConsumer.startField("value", 1); + writeValue(valueType, avroSchema.getValueType(), value); + recordConsumer.endField("value", 1); + } else if (!valueType.isRepetition(Type.Repetition.OPTIONAL)) { + throw new RuntimeException("Null map value for " + avroSchema.getName()); + } + recordConsumer.endGroup(); + } + + recordConsumer.endField("map", 0); + } + recordConsumer.endGroup(); + } + + private void writeUnion(GroupType parquetSchema, Schema avroSchema, + Object value) { + recordConsumer.startGroup(); + + // ResolveUnion will tell us which of the union member types to + // deserialise. + int avroIndex = GenericData.get().resolveUnion(avroSchema, value); + + // For parquet's schema we skip nulls + GroupType parquetGroup = parquetSchema.asGroupType(); + int parquetIndex = avroIndex; + for (int i = 0; i < avroIndex; i++) { + if (avroSchema.getTypes().get(i).getType().equals(Schema.Type.NULL)) { + parquetIndex--; + } + } + + // Sparsely populated method of encoding unions, each member has its own + // set of columns. + String memberName = "member" + parquetIndex; + recordConsumer.startField(memberName, parquetIndex); + writeValue(parquetGroup.getType(parquetIndex), + avroSchema.getTypes().get(avroIndex), value); + recordConsumer.endField(memberName, parquetIndex); + + recordConsumer.endGroup(); + } + + @SuppressWarnings("unchecked") + private void writeValue(Type type, Schema avroSchema, Object value) { + Schema nonNullAvroSchema = AvroSchemaConverter.getNonNull(avroSchema); + Schema.Type avroType = nonNullAvroSchema.getType(); + if (avroType.equals(Schema.Type.BOOLEAN)) { + recordConsumer.addBoolean((Boolean) value); + } else if (avroType.equals(Schema.Type.INT)) { + recordConsumer.addInteger(((Number) value).intValue()); + } else if (avroType.equals(Schema.Type.LONG)) { + recordConsumer.addLong(((Number) value).longValue()); + } else if (avroType.equals(Schema.Type.FLOAT)) { + recordConsumer.addFloat(((Number) value).floatValue()); + } else if (avroType.equals(Schema.Type.DOUBLE)) { + recordConsumer.addDouble(((Number) value).doubleValue()); + } else if (avroType.equals(Schema.Type.BYTES)) { + recordConsumer.addBinary(Binary.fromByteBuffer((ByteBuffer) value)); + } else if (avroType.equals(Schema.Type.STRING)) { + recordConsumer.addBinary(fromAvroString(value)); + } else if (avroType.equals(Schema.Type.RECORD)) { + writeRecord((GroupType) type, nonNullAvroSchema, (IndexedRecord) value); + } else if (avroType.equals(Schema.Type.ENUM)) { + recordConsumer.addBinary(Binary.fromString(value.toString())); + } else if (avroType.equals(Schema.Type.ARRAY)) { + writeArray((GroupType) type, nonNullAvroSchema, (Collection) value); + } else if (avroType.equals(Schema.Type.MAP)) { + writeMap((GroupType) type, nonNullAvroSchema, (Map) value); + } else if (avroType.equals(Schema.Type.UNION)) { + writeUnion((GroupType) type, nonNullAvroSchema, value); + } else if (avroType.equals(Schema.Type.FIXED)) { + recordConsumer.addBinary(Binary.fromByteArray(((GenericFixed) value).bytes())); + } + } + + private Binary fromAvroString(Object value) { + if (value instanceof Utf8) { + Utf8 utf8 = (Utf8) value; + return Binary.fromByteArray(utf8.getBytes(), 0, utf8.getByteLength()); + } + return Binary.fromString(value.toString()); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java b/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java new file mode 100644 index 0000000..2bd2599 --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/SpecificDataSupplier.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificData; + +class SpecificDataSupplier implements AvroDataSupplier { + @Override + public GenericData get() { + return SpecificData.get(); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java b/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java new file mode 100644 index 0000000..e5e0475 --- /dev/null +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/package-info.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * + *

+ * Provides classes to store Avro data in Parquet files. Avro schemas are converted to + * parquet schemas as follows. Only record schemas are converted, + * other top-level schema types are not converted and attempting to do so will result + * in an error. Avro types are converted to Parquet types using the mapping shown here: + *

+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Avro typeParquet type
nullno type (the field is not encoded in Parquet), unless a null union
booleanboolean
intint32
longint64
floatfloat
doubledouble
bytesbinary
stringbinary (with original type UTF8)
recordgroup containing nested fields
enumbinary (with original type ENUM)
arraygroup (with original type LIST) containing one repeated group field
mapgroup (with original type MAP) containing one repeated group + * field (with original type MAP_KEY_VALUE) of (key, value)
fixedfixed_len_byte_array
unionan optional type, in the case of a null union, otherwise not supported
+ * + *

+ * For Parquet files that were not written with classes from this package there is no + * Avro write schema stored in the Parquet file metadata. To read such files using + * classes from this package you must either provide an Avro read schema, + * or a default Avro schema will be derived using the following mapping. + *

+ * + * + * Parquet type + * Avro type + * + * + * boolean + * boolean + * + * + * int32 + * int + * + * + * int64 + * long + * + * + * int96 + * not supported + * + * + * float + * float + * + * + * double + * double + * + * + * fixed_len_byte_array + * fixed + * + * + * binary (with no original type) + * bytes + * + * + * binary (with original type UTF8) + * string + * + * + * binary (with original type ENUM) + * string + * + * + * group (with original type LIST) containing one repeated group field + * array + * + * + * group (with original type MAP) containing one repeated group + * field (with original type MAP_KEY_VALUE) of (key, value) + * map + * + * + * + *

+ * Parquet fields that are optional are mapped to an Avro null union. + *

+ * + *

+ * Some conversions are lossy. Avro nulls are not represented in Parquet, + * so they are lost when converted back to Avro. Similarly, a Parquet enum does not + * store its values, so it cannot be converted back to an Avro enum, + * which is why an Avro string had to suffice. Type names for nested records, enums, + * and fixed types are lost in the conversion to Parquet. + * Avro aliases, default values, field ordering, and documentation strings are all + * dropped in the conversion to Parquet. + * + * Parquet maps can have any type for keys, but this is not true in Avro where map keys + * are assumed to be strings. + *

+ */ +package org.apache.parquet.avro; http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java b/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java deleted file mode 100644 index c6c04ab..0000000 --- a/parquet-avro/src/main/java/parquet/avro/AvroDataSupplier.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package parquet.avro; - -import org.apache.avro.generic.GenericData; - -/** - * Allows clients to control how the classes associated - * with specific Avro records are managed and found, e.g., - * by creating an instance of {@code GenericData} that - * uses a particular {@code ClassLoader}. - */ -public interface AvroDataSupplier { - GenericData get(); -}