Return-Path: X-Original-To: apmail-parquet-commits-archive@minotaur.apache.org Delivered-To: apmail-parquet-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 98497176C1 for ; Mon, 27 Apr 2015 23:12:00 +0000 (UTC) Received: (qmail 48746 invoked by uid 500); 27 Apr 2015 23:12:00 -0000 Delivered-To: apmail-parquet-commits-archive@parquet.apache.org Received: (qmail 48693 invoked by uid 500); 27 Apr 2015 23:12:00 -0000 Mailing-List: contact commits-help@parquet.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.incubator.apache.org Delivered-To: mailing list commits@parquet.incubator.apache.org Received: (qmail 47803 invoked by uid 99); 27 Apr 2015 23:11:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Apr 2015 23:11:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 00A23E10A4; Mon, 27 Apr 2015 23:11:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: blue@apache.org To: commits@parquet.apache.org Date: Mon, 27 Apr 2015 23:12:35 -0000 Message-Id: <326a562b8df84ddc9e9e242536b4b406@git.apache.org> In-Reply-To: <190ba0de36204468a8a5a0e9d143ae5d@git.apache.org> References: <190ba0de36204468a8a5a0e9d143ae5d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [38/51] [partial] parquet-mr git commit: PARQUET-23: Rename to org.apache.parquet. http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java new file mode 100644 index 0000000..4f8b10d --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/predicate/ValidTypeMap.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.filter2.predicate; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.filter2.predicate.Operators.Column; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; + +/** + * Contains all valid mappings from class -> parquet type (and vice versa) for use in + * {@link FilterPredicate}s + * + * This is a bit ugly, but it allows us to provide good error messages at runtime + * when there are type mismatches. + * + * TODO: this has some overlap with {@link PrimitiveTypeName#javaType} + * TODO: (https://issues.apache.org/jira/browse/PARQUET-30) + */ +public class ValidTypeMap { + private ValidTypeMap() { } + + // classToParquetType and parquetTypeToClass are used as a bi-directional map + private static final Map, Set> classToParquetType = new HashMap, Set>(); + private static final Map>> parquetTypeToClass = new HashMap>>(); + + // set up the mapping in both directions + private static void add(Class c, FullTypeDescriptor f) { + Set descriptors = classToParquetType.get(c); + if (descriptors == null) { + descriptors = new HashSet(); + classToParquetType.put(c, descriptors); + } + descriptors.add(f); + + Set> classes = parquetTypeToClass.get(f); + if (classes == null) { + classes = new HashSet>(); + parquetTypeToClass.put(f, classes); + } + classes.add(c); + } + + static { + // basic primitive columns + add(Integer.class, new FullTypeDescriptor(PrimitiveTypeName.INT32, null)); + add(Long.class, new FullTypeDescriptor(PrimitiveTypeName.INT64, null)); + add(Float.class, new FullTypeDescriptor(PrimitiveTypeName.FLOAT, null)); + add(Double.class, new FullTypeDescriptor(PrimitiveTypeName.DOUBLE, null)); + add(Boolean.class, new FullTypeDescriptor(PrimitiveTypeName.BOOLEAN, null)); + + // Both of these binary types are valid + add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, null)); + add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null)); + + add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, OriginalType.UTF8)); + add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8)); + } + + /** + * Asserts that foundColumn was declared as a type that is compatible with the type for this column found + * in the schema of the parquet file. + * + * @throws java.lang.IllegalArgumentException if the types do not align + * + * @param foundColumn the column as declared by the user + * @param primitiveType the primitive type according to the schema + * @param originalType the original type according to the schema + */ + public static > void assertTypeValid(Column foundColumn, PrimitiveTypeName primitiveType, OriginalType originalType) { + Class foundColumnType = foundColumn.getColumnType(); + ColumnPath columnPath = foundColumn.getColumnPath(); + + Set validTypeDescriptors = classToParquetType.get(foundColumnType); + FullTypeDescriptor typeInFileMetaData = new FullTypeDescriptor(primitiveType, originalType); + + if (validTypeDescriptors == null) { + StringBuilder message = new StringBuilder(); + message + .append("Column ") + .append(columnPath.toDotString()) + .append(" was declared as type: ") + .append(foundColumnType.getName()) + .append(" which is not supported in FilterPredicates."); + + Set> supportedTypes = parquetTypeToClass.get(typeInFileMetaData); + if (supportedTypes != null) { + message + .append(" Supported types for this column are: ") + .append(supportedTypes); + } else { + message.append(" There are no supported types for columns of " + typeInFileMetaData); + } + throw new IllegalArgumentException(message.toString()); + } + + if (!validTypeDescriptors.contains(typeInFileMetaData)) { + StringBuilder message = new StringBuilder(); + message + .append("FilterPredicate column: ") + .append(columnPath.toDotString()) + .append("'s declared type (") + .append(foundColumnType.getName()) + .append(") does not match the schema found in file metadata. Column ") + .append(columnPath.toDotString()) + .append(" is of type: ") + .append(typeInFileMetaData) + .append("\nValid types for this column are: ") + .append(parquetTypeToClass.get(typeInFileMetaData)); + throw new IllegalArgumentException(message.toString()); + } + } + + private static final class FullTypeDescriptor { + private final PrimitiveTypeName primitiveType; + private final OriginalType originalType; + + private FullTypeDescriptor(PrimitiveTypeName primitiveType, OriginalType originalType) { + this.primitiveType = primitiveType; + this.originalType = originalType; + } + + public PrimitiveTypeName getPrimitiveType() { + return primitiveType; + } + + public OriginalType getOriginalType() { + return originalType; + } + + @Override + public String toString() { + return "FullTypeDescriptor(" + "PrimitiveType: " + primitiveType + ", OriginalType: " + originalType + ')'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + FullTypeDescriptor that = (FullTypeDescriptor) o; + + if (originalType != that.originalType) return false; + if (primitiveType != that.primitiveType) return false; + + return true; + } + + @Override + public int hashCode() { + int result = primitiveType != null ? primitiveType.hashCode() : 0; + result = 31 * result + (originalType != null ? originalType.hashCode() : 0); + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java new file mode 100644 index 0000000..a76b5ee --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringGroupConverter.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.filter2.recordlevel; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector; +import org.apache.parquet.io.PrimitiveColumnIO; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; + +import static org.apache.parquet.Preconditions.checkArgument; +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * See {@link FilteringRecordMaterializer} + */ +public class FilteringGroupConverter extends GroupConverter { + // the real converter + private final GroupConverter delegate; + + // the path, from the root of the schema, to this converter + // used ultimately by the primitive converter proxy to figure + // out which column it represents. + private final List indexFieldPath; + + // for a given column, which nodes in the filter expression need + // to be notified of this column's value + private final Map> valueInspectorsByColumn; + + // used to go from our indexFieldPath to the PrimitiveColumnIO for that column + private final Map, PrimitiveColumnIO> columnIOsByIndexFieldPath; + + public FilteringGroupConverter( + GroupConverter delegate, + List indexFieldPath, + Map> valueInspectorsByColumn, Map, + PrimitiveColumnIO> columnIOsByIndexFieldPath) { + + this.delegate = checkNotNull(delegate, "delegate"); + this.indexFieldPath = checkNotNull(indexFieldPath, "indexFieldPath"); + this.columnIOsByIndexFieldPath = checkNotNull(columnIOsByIndexFieldPath, "columnIOsByIndexFieldPath"); + this.valueInspectorsByColumn = checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn"); + } + + // When a converter is asked for, we get the real one from the delegate, then wrap it + // in a filtering pass-through proxy. + // TODO: making the assumption that getConverter(i) is only called once, is that valid? + @Override + public Converter getConverter(int fieldIndex) { + + // get the real converter from the delegate + Converter delegateConverter = checkNotNull(delegate.getConverter(fieldIndex), "delegate converter"); + + // determine the indexFieldPath for the converter proxy we're about to make, which is + // this converter's path + the requested fieldIndex + List newIndexFieldPath = new ArrayList(indexFieldPath.size() + 1); + newIndexFieldPath.addAll(indexFieldPath); + newIndexFieldPath.add(fieldIndex); + + if (delegateConverter.isPrimitive()) { + PrimitiveColumnIO columnIO = getColumnIO(newIndexFieldPath); + ColumnPath columnPath = ColumnPath.get(columnIO.getColumnDescriptor().getPath()); + ValueInspector[] valueInspectors = getValueInspectors(columnPath); + return new FilteringPrimitiveConverter(delegateConverter.asPrimitiveConverter(), valueInspectors); + } else { + return new FilteringGroupConverter(delegateConverter.asGroupConverter(), newIndexFieldPath, valueInspectorsByColumn, columnIOsByIndexFieldPath); + } + + } + + private PrimitiveColumnIO getColumnIO(List indexFieldPath) { + PrimitiveColumnIO found = columnIOsByIndexFieldPath.get(indexFieldPath); + checkArgument(found != null, "Did not find PrimitiveColumnIO for index field path" + indexFieldPath); + return found; + } + + private ValueInspector[] getValueInspectors(ColumnPath columnPath) { + List inspectorsList = valueInspectorsByColumn.get(columnPath); + if (inspectorsList == null) { + return new ValueInspector[] {}; + } else { + return inspectorsList.toArray(new ValueInspector[inspectorsList.size()]); + } + } + + @Override + public void start() { + delegate.start(); + } + + @Override + public void end() { + delegate.end(); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java new file mode 100644 index 0000000..18edb64 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringPrimitiveConverter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.filter2.recordlevel; + +import org.apache.parquet.column.Dictionary; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.PrimitiveConverter; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * see {@link FilteringRecordMaterializer} + * + * This pass-through proxy for a delegate {@link PrimitiveConverter} also + * updates the {@link ValueInspector}s of a {@link IncrementallyUpdatedFilterPredicate} + */ +public class FilteringPrimitiveConverter extends PrimitiveConverter { + private final PrimitiveConverter delegate; + private final ValueInspector[] valueInspectors; + + public FilteringPrimitiveConverter(PrimitiveConverter delegate, ValueInspector[] valueInspectors) { + this.delegate = checkNotNull(delegate, "delegate"); + this.valueInspectors = checkNotNull(valueInspectors, "valueInspectors"); + } + + // TODO: this works, but + // TODO: essentially turns off the benefits of dictionary support + // TODO: even if the underlying delegate supports it. + // TODO: we should support it here. (https://issues.apache.org/jira/browse/PARQUET-36) + @Override + public boolean hasDictionarySupport() { + return false; + } + + @Override + public void setDictionary(Dictionary dictionary) { + throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support"); + } + + @Override + public void addValueFromDictionary(int dictionaryId) { + throw new UnsupportedOperationException("FilteringPrimitiveConverter doesn't have dictionary support"); + } + + @Override + public void addBinary(Binary value) { + for (ValueInspector valueInspector : valueInspectors) { + valueInspector.update(value); + } + delegate.addBinary(value); + } + + @Override + public void addBoolean(boolean value) { + for (ValueInspector valueInspector : valueInspectors) { + valueInspector.update(value); + } + delegate.addBoolean(value); + } + + @Override + public void addDouble(double value) { + for (ValueInspector valueInspector : valueInspectors) { + valueInspector.update(value); + } + delegate.addDouble(value); + } + + @Override + public void addFloat(float value) { + for (ValueInspector valueInspector : valueInspectors) { + valueInspector.update(value); + } + delegate.addFloat(value); + } + + @Override + public void addInt(int value) { + for (ValueInspector valueInspector : valueInspectors) { + valueInspector.update(value); + } + delegate.addInt(value); + } + + @Override + public void addLong(long value) { + for (ValueInspector valueInspector : valueInspectors) { + valueInspector.update(value); + } + delegate.addLong(value); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java new file mode 100644 index 0000000..d8fa677 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/FilteringRecordMaterializer.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.filter2.recordlevel; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector; +import org.apache.parquet.io.PrimitiveColumnIO; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * A pass-through proxy for a {@link RecordMaterializer} that updates a {@link IncrementallyUpdatedFilterPredicate} + * as it receives concrete values for the current record. If, after the record assembly signals that + * there are no more values, the predicate indicates that this record should be dropped, {@link #getCurrentRecord()} + * returns null to signal that this record is being skipped. + * Otherwise, the record is retrieved from the delegate. + */ +public class FilteringRecordMaterializer extends RecordMaterializer { + // the real record materializer + private final RecordMaterializer delegate; + + // the proxied root converter + private final FilteringGroupConverter rootConverter; + + // the predicate + private final IncrementallyUpdatedFilterPredicate filterPredicate; + + public FilteringRecordMaterializer( + RecordMaterializer delegate, + List columnIOs, + Map> valueInspectorsByColumn, + IncrementallyUpdatedFilterPredicate filterPredicate) { + + checkNotNull(columnIOs, "columnIOs"); + checkNotNull(valueInspectorsByColumn, "valueInspectorsByColumn"); + this.filterPredicate = checkNotNull(filterPredicate, "filterPredicate"); + this.delegate = checkNotNull(delegate, "delegate"); + + // keep track of which path of indices leads to which primitive column + Map, PrimitiveColumnIO> columnIOsByIndexFieldPath = new HashMap, PrimitiveColumnIO>(); + + for (PrimitiveColumnIO c : columnIOs) { + columnIOsByIndexFieldPath.put(getIndexFieldPathList(c), c); + } + + // create a proxy for the delegate's root converter + this.rootConverter = new FilteringGroupConverter( + delegate.getRootConverter(), Collections.emptyList(), valueInspectorsByColumn, columnIOsByIndexFieldPath); + } + + public static List getIndexFieldPathList(PrimitiveColumnIO c) { + return intArrayToList(c.getIndexFieldPath()); + } + + public static List intArrayToList(int[] arr) { + List list = new ArrayList(arr.length); + for (int i : arr) { + list.add(i); + } + return list; + } + + + + @Override + public T getCurrentRecord() { + + // find out if the predicate thinks we should keep this record + boolean keep = IncrementallyUpdatedFilterPredicateEvaluator.evaluate(filterPredicate); + + // reset the stateful predicate no matter what + IncrementallyUpdatedFilterPredicateResetter.reset(filterPredicate); + + if (keep) { + return delegate.getCurrentRecord(); + } else { + // signals a skip + return null; + } + } + + @Override + public void skipCurrentRecord() { + delegate.skipCurrentRecord(); + } + + @Override + public GroupConverter getRootConverter() { + return rootConverter; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java new file mode 100644 index 0000000..606c78f --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicate.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.filter2.recordlevel; + +import org.apache.parquet.io.api.Binary; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * A rewritten version of a {@link org.apache.parquet.filter2.predicate.FilterPredicate} which receives + * the values for a record's columns one by one and internally tracks whether the predicate is + * satisfied, unsatisfied, or unknown. + * + * This is used to apply a predicate during record assembly, without assembling a second copy of + * a record, and without building a stack of update events. + * + * IncrementallyUpdatedFilterPredicate is implemented via the visitor pattern, as is + * {@link org.apache.parquet.filter2.predicate.FilterPredicate} + */ +public interface IncrementallyUpdatedFilterPredicate { + + /** + * A Visitor for an {@link IncrementallyUpdatedFilterPredicate}, per the visitor pattern. + */ + public static interface Visitor { + boolean visit(ValueInspector p); + boolean visit(And and); + boolean visit(Or or); + } + + /** + * A {@link IncrementallyUpdatedFilterPredicate} must accept a {@link Visitor}, per the visitor pattern. + */ + boolean accept(Visitor visitor); + + /** + * This is the leaf node of a filter predicate. It receives the value for the primitive column it represents, + * and decides whether or not the predicate represented by this node is satisfied. + * + * It is stateful, and needs to be rest after use. + */ + public static abstract class ValueInspector implements IncrementallyUpdatedFilterPredicate { + // package private constructor + ValueInspector() { } + + private boolean result = false; + private boolean isKnown = false; + + // these methods signal what the value is + public void updateNull() { throw new UnsupportedOperationException(); } + public void update(int value) { throw new UnsupportedOperationException(); } + public void update(long value) { throw new UnsupportedOperationException(); } + public void update(double value) { throw new UnsupportedOperationException(); } + public void update(float value) { throw new UnsupportedOperationException(); } + public void update(boolean value) { throw new UnsupportedOperationException(); } + public void update(Binary value) { throw new UnsupportedOperationException(); } + + /** + * Reset to clear state and begin evaluating the next record. + */ + public final void reset() { + isKnown = false; + result = false; + } + + /** + * Subclasses should call this method to signal that the result of this predicate is known. + */ + protected final void setResult(boolean result) { + if (isKnown) { + throw new IllegalStateException("setResult() called on a ValueInspector whose result is already known!" + + " Did you forget to call reset()?"); + } + this.result = result; + this.isKnown = true; + } + + /** + * Should only be called if {@link #isKnown} return true. + */ + public final boolean getResult() { + if (!isKnown) { + throw new IllegalStateException("getResult() called on a ValueInspector whose result is not yet known!"); + } + return result; + } + + /** + * Return true if this inspector has received a value yet, false otherwise. + */ + public final boolean isKnown() { + return isKnown; + } + + @Override + public boolean accept(Visitor visitor) { + return visitor.visit(this); + } + } + + // base class for and / or + static abstract class BinaryLogical implements IncrementallyUpdatedFilterPredicate { + private final IncrementallyUpdatedFilterPredicate left; + private final IncrementallyUpdatedFilterPredicate right; + + BinaryLogical(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) { + this.left = checkNotNull(left, "left"); + this.right = checkNotNull(right, "right"); + } + + public final IncrementallyUpdatedFilterPredicate getLeft() { + return left; + } + + public final IncrementallyUpdatedFilterPredicate getRight() { + return right; + } + } + + public static final class Or extends BinaryLogical { + Or(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) { + super(left, right); + } + + @Override + public boolean accept(Visitor visitor) { + return visitor.visit(this); + } + } + + public static final class And extends BinaryLogical { + And(IncrementallyUpdatedFilterPredicate left, IncrementallyUpdatedFilterPredicate right) { + super(left, right); + } + + @Override + public boolean accept(Visitor visitor) { + return visitor.visit(this); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java new file mode 100644 index 0000000..8def88e --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateBuilderBase.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.filter2.recordlevel; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor; +import org.apache.parquet.filter2.predicate.Operators.And; +import org.apache.parquet.filter2.predicate.Operators.Not; +import org.apache.parquet.filter2.predicate.Operators.Or; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector; + +import static org.apache.parquet.Preconditions.checkArgument; + +/** + * The implementation of this abstract class is auto-generated by + * {@link org.apache.parquet.filter2.IncrementallyUpdatedFilterPredicateGenerator} + * + * Constructs a {@link IncrementallyUpdatedFilterPredicate} from a {@link org.apache.parquet.filter2.predicate.FilterPredicate} + * This is how records are filtered during record assembly. The implementation is generated in order to avoid autoboxing. + * + * Note: the supplied predicate must not contain any instances of the not() operator as this is not + * supported by this filter. + * + * the supplied predicate should first be run through {@link org.apache.parquet.filter2.predicate.LogicalInverseRewriter} to rewrite it + * in a form that doesn't make use of the not() operator. + * + * the supplied predicate should also have already been run through + * {@link org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator} + * to make sure it is compatible with the schema of this file. + * + * TODO: UserDefinedPredicates still autobox however + */ +public abstract class IncrementallyUpdatedFilterPredicateBuilderBase implements Visitor { + private boolean built = false; + private final Map> valueInspectorsByColumn = new HashMap>(); + + public IncrementallyUpdatedFilterPredicateBuilderBase() { } + + public final IncrementallyUpdatedFilterPredicate build(FilterPredicate pred) { + checkArgument(!built, "This builder has already been used"); + IncrementallyUpdatedFilterPredicate incremental = pred.accept(this); + built = true; + return incremental; + } + + protected final void addValueInspector(ColumnPath columnPath, ValueInspector valueInspector) { + List valueInspectors = valueInspectorsByColumn.get(columnPath); + if (valueInspectors == null) { + valueInspectors = new ArrayList(); + valueInspectorsByColumn.put(columnPath, valueInspectors); + } + valueInspectors.add(valueInspector); + } + + public Map> getValueInspectorsByColumn() { + return valueInspectorsByColumn; + } + + @Override + public final IncrementallyUpdatedFilterPredicate visit(And and) { + return new IncrementallyUpdatedFilterPredicate.And(and.getLeft().accept(this), and.getRight().accept(this)); + } + + @Override + public final IncrementallyUpdatedFilterPredicate visit(Or or) { + return new IncrementallyUpdatedFilterPredicate.Or(or.getLeft().accept(this), or.getRight().accept(this)); + } + + @Override + public final IncrementallyUpdatedFilterPredicate visit(Not not) { + throw new IllegalArgumentException( + "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java new file mode 100644 index 0000000..d1aa66c --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateEvaluator.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.filter2.recordlevel; + +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Determines whether an {@link IncrementallyUpdatedFilterPredicate} is satisfied or not. + * This implementation makes the assumption that all {@link ValueInspector}s in an unknown state + * represent columns with a null value, and updates them accordingly. + * + * TODO: We could also build an evaluator that detects if enough values are known to determine the outcome + * TODO: of the predicate and quit the record assembly early. (https://issues.apache.org/jira/browse/PARQUET-37) + */ +public class IncrementallyUpdatedFilterPredicateEvaluator implements Visitor { + private static final IncrementallyUpdatedFilterPredicateEvaluator INSTANCE = new IncrementallyUpdatedFilterPredicateEvaluator(); + + public static boolean evaluate(IncrementallyUpdatedFilterPredicate pred) { + checkNotNull(pred, "pred"); + return pred.accept(INSTANCE); + } + + private IncrementallyUpdatedFilterPredicateEvaluator() {} + + @Override + public boolean visit(ValueInspector p) { + if (!p.isKnown()) { + p.updateNull(); + } + return p.getResult(); + } + + @Override + public boolean visit(And and) { + return and.getLeft().accept(this) && and.getRight().accept(this); + } + + @Override + public boolean visit(Or or) { + return or.getLeft().accept(this) || or.getRight().accept(this); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java new file mode 100644 index 0000000..a75731a --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/filter2/recordlevel/IncrementallyUpdatedFilterPredicateResetter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.filter2.recordlevel; + +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.And; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Or; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.ValueInspector; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate.Visitor; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Resets all the {@link ValueInspector}s in a {@link IncrementallyUpdatedFilterPredicate}. + */ +public final class IncrementallyUpdatedFilterPredicateResetter implements Visitor { + private static final IncrementallyUpdatedFilterPredicateResetter INSTANCE = new IncrementallyUpdatedFilterPredicateResetter(); + + public static void reset(IncrementallyUpdatedFilterPredicate pred) { + checkNotNull(pred, "pred"); + pred.accept(INSTANCE); + } + + private IncrementallyUpdatedFilterPredicateResetter() { } + + @Override + public boolean visit(ValueInspector p) { + p.reset(); + return false; + } + + @Override + public boolean visit(And and) { + and.getLeft().accept(this); + and.getRight().accept(this); + return false; + } + + @Override + public boolean visit(Or or) { + or.getLeft().accept(this); + or.getRight().accept(this); + return false; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java new file mode 100644 index 0000000..f2d88fc --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/BaseRecordReader.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import static org.apache.parquet.Log.DEBUG; +import org.apache.parquet.Log; +import org.apache.parquet.column.ColumnReadStore; +import org.apache.parquet.io.RecordReaderImplementation.State; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.io.api.RecordMaterializer; + +// TODO(julien): this class appears to be unused -- can it be nuked? - todd +public abstract class BaseRecordReader extends RecordReader { + private static final Log LOG = Log.getLog(BaseRecordReader.class); + + public RecordConsumer recordConsumer; + public RecordMaterializer recordMaterializer; + public ColumnReadStore columnStore; + @Override + public T read() { + readOneRecord(); + return recordMaterializer.getCurrentRecord(); + } + + protected abstract void readOneRecord(); + + State[] caseLookup; + + private String endField; + + private int endIndex; + + protected void currentLevel(int currentLevel) { + if (DEBUG) LOG.debug("currentLevel: "+currentLevel); + } + + protected void log(String message) { + if (DEBUG) LOG.debug("bc: "+message); + } + + final protected int getCaseId(int state, int currentLevel, int d, int nextR) { + return caseLookup[state].getCase(currentLevel, d, nextR).getID(); + } + + final protected void startMessage() { + // reset state + endField = null; + if (DEBUG) LOG.debug("startMessage()"); + recordConsumer.startMessage(); + } + + final protected void startGroup(String field, int index) { + startField(field, index); + if (DEBUG) LOG.debug("startGroup()"); + recordConsumer.startGroup(); + } + + private void startField(String field, int index) { + if (DEBUG) LOG.debug("startField("+field+","+index+")"); + if (endField != null && index == endIndex) { + // skip the close/open tag + endField = null; + } else { + if (endField != null) { + // close the previous field + recordConsumer.endField(endField, endIndex); + endField = null; + } + recordConsumer.startField(field, index); + } + } + + final protected void addPrimitiveINT64(String field, int index, long value) { + startField(field, index); + if (DEBUG) LOG.debug("addLong("+value+")"); + recordConsumer.addLong(value); + endField(field, index); + } + + private void endField(String field, int index) { + if (DEBUG) LOG.debug("endField("+field+","+index+")"); + if (endField != null) { + recordConsumer.endField(endField, endIndex); + } + endField = field; + endIndex = index; + } + + final protected void addPrimitiveBINARY(String field, int index, Binary value) { + startField(field, index); + if (DEBUG) LOG.debug("addBinary("+value+")"); + recordConsumer.addBinary(value); + endField(field, index); + } + + final protected void addPrimitiveINT32(String field, int index, int value) { + startField(field, index); + if (DEBUG) LOG.debug("addInteger("+value+")"); + recordConsumer.addInteger(value); + endField(field, index); + } + + final protected void endGroup(String field, int index) { + if (endField != null) { + // close the previous field + recordConsumer.endField(endField, endIndex); + endField = null; + } + if (DEBUG) LOG.debug("endGroup()"); + recordConsumer.endGroup(); + endField(field, index); + } + + final protected void endMessage() { + if (endField != null) { + // close the previous field + recordConsumer.endField(endField, endIndex); + endField = null; + } + if (DEBUG) LOG.debug("endMessage()"); + recordConsumer.endMessage(); + } + + protected void error(String message) { + throw new ParquetDecodingException(message); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java new file mode 100644 index 0000000..95a969e --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIO.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + + +import java.util.Arrays; +import java.util.List; + +import org.apache.parquet.Log; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Type.Repetition; + +/** + * a structure used to serialize deserialize records + * + * @author Julien Le Dem + * + */ +abstract public class ColumnIO { + + static final boolean DEBUG = Log.DEBUG; + + private final GroupColumnIO parent; + private final Type type; + private final String name; + private final int index; + private int repetitionLevel; + private int definitionLevel; + private String[] fieldPath; + private int[] indexFieldPath; + + + ColumnIO(Type type, GroupColumnIO parent, int index) { + this.type = type; + this.parent = parent; + this.index = index; + this.name = type.getName(); + } + + String[] getFieldPath() { + return fieldPath; + } + + public String getFieldPath(int level) { + return fieldPath[level]; + } + + public int[] getIndexFieldPath() { + return indexFieldPath; + } + + public int getIndexFieldPath(int level) { + return indexFieldPath[level]; + } + + public int getIndex() { + return this.index; + } + + public String getName() { + return name; + } + + int getRepetitionLevel() { + return repetitionLevel; + } + + int getDefinitionLevel() { + return definitionLevel; + } + + void setRepetitionLevel(int repetitionLevel) { + this.repetitionLevel = repetitionLevel; + } + + void setDefinitionLevel(int definitionLevel) { + this.definitionLevel = definitionLevel; + } + + void setFieldPath(String[] fieldPath, int[] indexFieldPath) { + this.fieldPath = fieldPath; + this.indexFieldPath = indexFieldPath; + } + + public Type getType() { + return type; + } + + void setLevels(int r, int d, String[] fieldPath, int[] indexFieldPath, List repetition, List path) { + setRepetitionLevel(r); + setDefinitionLevel(d); + setFieldPath(fieldPath, indexFieldPath); + } + + abstract List getColumnNames(); + + public GroupColumnIO getParent() { + return parent; + } + + abstract PrimitiveColumnIO getLast(); + abstract PrimitiveColumnIO getFirst(); + + ColumnIO getParent(int r) { + if (getRepetitionLevel() == r && getType().isRepetition(Repetition.REPEATED)) { + return this; + } else if (getParent()!=null && getParent().getDefinitionLevel()>=r) { + return getParent().getParent(r); + } else { + throw new InvalidRecordException("no parent("+r+") for "+Arrays.toString(this.getFieldPath())); + } + } + + @Override + public String toString() { + return this.getClass().getSimpleName()+" "+type.getName() + +" r:"+repetitionLevel + +" d:"+definitionLevel + +" "+Arrays.toString(fieldPath); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java new file mode 100644 index 0000000..71af780 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/ColumnIOFactory.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.TypeVisitor; + +/** + * Factory constructing the ColumnIO structure from the schema + * + * @author Julien Le Dem + * + */ +public class ColumnIOFactory { + + public class ColumnIOCreatorVisitor implements TypeVisitor { + + private MessageColumnIO columnIO; + private GroupColumnIO current; + private List leaves = new ArrayList(); + private final boolean validating; + private final MessageType requestedSchema; + private int currentRequestedIndex; + private Type currentRequestedType; + private boolean strictTypeChecking; + + public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema) { + this(validating, requestedSchema, true); + } + + public ColumnIOCreatorVisitor(boolean validating, MessageType requestedSchema, boolean strictTypeChecking) { + this.validating = validating; + this.requestedSchema = requestedSchema; + this.strictTypeChecking = strictTypeChecking; + } + + @Override + public void visit(MessageType messageType) { + columnIO = new MessageColumnIO(requestedSchema, validating); + visitChildren(columnIO, messageType, requestedSchema); + columnIO.setLevels(); + columnIO.setLeaves(leaves); + } + + @Override + public void visit(GroupType groupType) { + if (currentRequestedType.isPrimitive()) { + incompatibleSchema(groupType, currentRequestedType); + } + GroupColumnIO newIO = new GroupColumnIO(groupType, current, currentRequestedIndex); + current.add(newIO); + visitChildren(newIO, groupType, currentRequestedType.asGroupType()); + } + + private void visitChildren(GroupColumnIO newIO, GroupType groupType, GroupType requestedGroupType) { + GroupColumnIO oldIO = current; + current = newIO; + for (Type type : groupType.getFields()) { + // if the file schema does not contain the field it will just stay null + if (requestedGroupType.containsField(type.getName())) { + currentRequestedIndex = requestedGroupType.getFieldIndex(type.getName()); + currentRequestedType = requestedGroupType.getType(currentRequestedIndex); + if (currentRequestedType.getRepetition().isMoreRestrictiveThan(type.getRepetition())) { + incompatibleSchema(type, currentRequestedType); + } + type.accept(this); + } + } + current = oldIO; + } + + @Override + public void visit(PrimitiveType primitiveType) { + if (!currentRequestedType.isPrimitive() || + (this.strictTypeChecking && currentRequestedType.asPrimitiveType().getPrimitiveTypeName() != primitiveType.getPrimitiveTypeName())) { + incompatibleSchema(primitiveType, currentRequestedType); + } + PrimitiveColumnIO newIO = new PrimitiveColumnIO(primitiveType, current, currentRequestedIndex, leaves.size()); + current.add(newIO); + leaves.add(newIO); + } + + private void incompatibleSchema(Type fileType, Type requestedType) { + throw new ParquetDecodingException("The requested schema is not compatible with the file schema. incompatible types: " + requestedType + " != " + fileType); + } + + public MessageColumnIO getColumnIO() { + return columnIO; + } + + } + + private final boolean validating; + + /** + * validation is off by default + */ + public ColumnIOFactory() { + this(false); + } + + /** + * @param validating to turn validation on + */ + public ColumnIOFactory(boolean validating) { + super(); + this.validating = validating; + } + + /** + * @param schema the requestedSchema we want to read/write + * @param fileSchema the file schema (when reading it can be different from the requested schema) + * @return the corresponding serializing/deserializing structure + */ + public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema) { + return getColumnIO(requestedSchema, fileSchema, true); + } + + /** + * @param schema the requestedSchema we want to read/write + * @param fileSchema the file schema (when reading it can be different from the requested schema) + * @param strict should file type and requested primitive types match + * @return the corresponding serializing/deserializing structure + */ + public MessageColumnIO getColumnIO(MessageType requestedSchema, MessageType fileSchema, boolean strict) { + ColumnIOCreatorVisitor visitor = new ColumnIOCreatorVisitor(validating, requestedSchema, strict); + fileSchema.accept(visitor); + return visitor.getColumnIO(); + } + + /** + * @param schema the schema we want to read/write + * @return the corresponding serializing/deserializing structure + */ + public MessageColumnIO getColumnIO(MessageType schema) { + return this.getColumnIO(schema, schema); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java b/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java new file mode 100644 index 0000000..e15ab2e --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/CompilationException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import org.apache.parquet.ParquetRuntimeException; + +/** + * thrown when a problem occured while compiling the column reader + * + * @author Julien Le Dem + * + */ +public class CompilationException extends ParquetRuntimeException { + private static final long serialVersionUID = 1L; + + public CompilationException() { + } + + public CompilationException(String message, Throwable cause) { + super(message, cause); + } + + public CompilationException(String message) { + super(message); + } + + public CompilationException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java new file mode 100644 index 0000000..671c651 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/EmptyRecordReader.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.RecordMaterializer; + +/** + * used to read empty schema + * + * @author Mickael Lacour + * + * @param the type of the materialized record + */ +class EmptyRecordReader extends RecordReader { + + private final GroupConverter recordConsumer; + private final RecordMaterializer recordMaterializer; + + public EmptyRecordReader(RecordMaterializer recordMaterializer) { + this.recordMaterializer = recordMaterializer; + this.recordConsumer = recordMaterializer.getRootConverter(); // TODO: validator(wrap(recordMaterializer), validating, root.getType()); + } + + /** + * @see org.apache.parquet.io.RecordReader#read() + */ + @Override + public T read() { + recordConsumer.start(); + recordConsumer.end(); + return recordMaterializer.getCurrentRecord(); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java b/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java new file mode 100644 index 0000000..3444b1f --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/FilteredRecordReader.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.filter.RecordFilter; +import org.apache.parquet.filter.UnboundRecordFilter; +import org.apache.parquet.io.api.RecordMaterializer; + +/** + * Extends the + * @author Jacob Metcalf + * + */ +class FilteredRecordReader extends RecordReaderImplementation { + + private final RecordFilter recordFilter; + private final long recordCount; + private long recordsRead = 0; + + /** + * @param root the root of the schema + * @param validating + * @param columnStore + * @param unboundFilter Filter records, pass in NULL_FILTER to leave unfiltered. + */ + public FilteredRecordReader(MessageColumnIO root, RecordMaterializer recordMaterializer, boolean validating, + ColumnReadStoreImpl columnStore, UnboundRecordFilter unboundFilter, long recordCount) { + super(root, recordMaterializer, validating, columnStore); + this.recordCount = recordCount; + if ( unboundFilter != null ) { + recordFilter = unboundFilter.bind(getColumnReaders()); + } else { + recordFilter = null; + } + } + + /** + * Override read() method to provide skip. + */ + @Override + public T read() { + skipToMatch(); + if (recordsRead == recordCount) { + return null; + } + ++ recordsRead; + return super.read(); + } + + // FilteredRecordReader skips forwards itself, it never asks the layer above to do the skipping for it. + // This is different from how filtering is handled in the filter2 API + @Override + public boolean shouldSkipCurrentRecord() { + return false; + } + + /** + * Skips forwards until the filter finds the first match. Returns false + * if none found. + */ + private void skipToMatch() { + while (recordsRead < recordCount && !recordFilter.isMatch()) { + State currentState = getState(0); + do { + ColumnReader columnReader = currentState.column; + + // currentLevel = depth + 1 at this point + // set the current value + if (columnReader.getCurrentDefinitionLevel() >= currentState.maxDefinitionLevel) { + columnReader.skip(); + } + columnReader.consume(); + + // Based on repetition level work out next state to go to + int nextR = currentState.maxRepetitionLevel == 0 ? 0 : columnReader.getCurrentRepetitionLevel(); + currentState = currentState.getNextState(nextR); + } while (currentState != null); + ++ recordsRead; + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java new file mode 100644 index 0000000..1efe0d1 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/GroupColumnIO.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import static org.apache.parquet.schema.Type.Repetition.REPEATED; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.parquet.Log; +import org.apache.parquet.schema.GroupType; + +/** + * Group level of the IO structure + * + * + * @author Julien Le Dem + * + */ +public class GroupColumnIO extends ColumnIO { + private static final Log LOG = Log.getLog(GroupColumnIO.class); + + private final Map childrenByName = new HashMap(); + private final List children = new ArrayList(); + private int childrenSize = 0; + + GroupColumnIO(GroupType groupType, GroupColumnIO parent, int index) { + super(groupType, parent, index); + } + + void add(ColumnIO child) { + children.add(child); + childrenByName.put(child.getType().getName(), child); + ++ childrenSize; + } + + @Override + void setLevels(int r, int d, String[] fieldPath, int[] indexFieldPath, List repetition, List path) { + super.setLevels(r, d, fieldPath, indexFieldPath, repetition, path); + for (ColumnIO child : this.children) { + String[] newFieldPath = Arrays.copyOf(fieldPath, fieldPath.length + 1); + int[] newIndexFieldPath = Arrays.copyOf(indexFieldPath, indexFieldPath.length + 1); + newFieldPath[fieldPath.length] = child.getType().getName(); + newIndexFieldPath[indexFieldPath.length] = child.getIndex(); + List newRepetition; + if (child.getType().isRepetition(REPEATED)) { + newRepetition = new ArrayList(repetition); + newRepetition.add(child); + } else { + newRepetition = repetition; + } + List newPath = new ArrayList(path); + newPath.add(child); + child.setLevels( + // the type repetition level increases whenever there's a possible repetition + child.getType().isRepetition(REPEATED) ? r + 1 : r, + // the type definition level increases whenever a field can be missing (not required) + !child.getType().isRepetition(REQUIRED) ? d + 1 : d, + newFieldPath, + newIndexFieldPath, + newRepetition, + newPath + ); + + } + } + + @Override + List getColumnNames() { + ArrayList result = new ArrayList(); + for (ColumnIO c : children) { + result.addAll(c.getColumnNames()); + } + return result; + } + + PrimitiveColumnIO getLast() { + return children.get(children.size()-1).getLast(); + } + + PrimitiveColumnIO getFirst() { + return children.get(0).getFirst(); + } + + public ColumnIO getChild(String name) { + return childrenByName.get(name); + } + + public ColumnIO getChild(int fieldIndex) { + try { + return children.get(fieldIndex); + } catch (IndexOutOfBoundsException e) { + throw new InvalidRecordException("could not get child " + fieldIndex + " from " + children, e); + } + } + + public int getChildrenCount() { + return childrenSize; + + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java b/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java new file mode 100644 index 0000000..d3d0111 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/InvalidRecordException.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import org.apache.parquet.ParquetRuntimeException; + +/** + * thrown when an invalid record is encountered + * + * @author Julien Le Dem + * + */ +public class InvalidRecordException extends ParquetRuntimeException { + private static final long serialVersionUID = 1L; + + public InvalidRecordException() { + super(); + } + + public InvalidRecordException(String message, Throwable cause) { + super(message, cause); + } + + public InvalidRecordException(String message) { + super(message); + } + + public InvalidRecordException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java new file mode 100644 index 0000000..e24aedb --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/MessageColumnIO.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import java.util.Arrays; +import java.util.BitSet; +import java.util.List; + +import org.apache.parquet.Log; +import org.apache.parquet.column.ColumnWriteStore; +import org.apache.parquet.column.ColumnWriter; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter.UnboundRecordFilter; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat; +import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter; +import org.apache.parquet.filter2.compat.FilterCompat.UnboundRecordFilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Visitor; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.filter2.recordlevel.FilteringRecordMaterializer; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicate; +import org.apache.parquet.filter2.recordlevel.IncrementallyUpdatedFilterPredicateBuilder; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.schema.MessageType; + +import static org.apache.parquet.Preconditions.checkNotNull; + +/** + * Message level of the IO structure + * + * + * @author Julien Le Dem + * + */ +public class MessageColumnIO extends GroupColumnIO { + private static final Log logger = Log.getLog(MessageColumnIO.class); + + private static final boolean DEBUG = Log.DEBUG; + + private List leaves; + + private final boolean validating; + + MessageColumnIO(MessageType messageType, boolean validating) { + super(messageType, null, 0); + this.validating = validating; + } + + public List getColumnNames() { + return super.getColumnNames(); + } + + public RecordReader getRecordReader(PageReadStore columns, + RecordMaterializer recordMaterializer) { + return getRecordReader(columns, recordMaterializer, FilterCompat.NOOP); + } + + /** + * @deprecated use {@link #getRecordReader(PageReadStore, RecordMaterializer, Filter)} + */ + @Deprecated + public RecordReader getRecordReader(PageReadStore columns, + RecordMaterializer recordMaterializer, + UnboundRecordFilter filter) { + return getRecordReader(columns, recordMaterializer, FilterCompat.get(filter)); + } + + public RecordReader getRecordReader(final PageReadStore columns, + final RecordMaterializer recordMaterializer, + final Filter filter) { + checkNotNull(columns, "columns"); + checkNotNull(recordMaterializer, "recordMaterializer"); + checkNotNull(filter, "filter"); + + if (leaves.isEmpty()) { + return new EmptyRecordReader(recordMaterializer); + } + + return filter.accept(new Visitor>() { + @Override + public RecordReader visit(FilterPredicateCompat filterPredicateCompat) { + + FilterPredicate predicate = filterPredicateCompat.getFilterPredicate(); + IncrementallyUpdatedFilterPredicateBuilder builder = new IncrementallyUpdatedFilterPredicateBuilder(); + IncrementallyUpdatedFilterPredicate streamingPredicate = builder.build(predicate); + RecordMaterializer filteringRecordMaterializer = new FilteringRecordMaterializer( + recordMaterializer, + leaves, + builder.getValueInspectorsByColumn(), + streamingPredicate); + + return new RecordReaderImplementation( + MessageColumnIO.this, + filteringRecordMaterializer, + validating, + new ColumnReadStoreImpl(columns, filteringRecordMaterializer.getRootConverter(), getType())); + } + + @Override + public RecordReader visit(UnboundRecordFilterCompat unboundRecordFilterCompat) { + return new FilteredRecordReader( + MessageColumnIO.this, + recordMaterializer, + validating, + new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()), + unboundRecordFilterCompat.getUnboundRecordFilter(), + columns.getRowCount() + ); + + } + + @Override + public RecordReader visit(NoOpFilter noOpFilter) { + return new RecordReaderImplementation( + MessageColumnIO.this, + recordMaterializer, + validating, + new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType())); + } + }); + } + + private class MessageColumnIORecordConsumer extends RecordConsumer { + private ColumnIO currentColumnIO; + private int currentLevel = 0; + + private class FieldsMarker { + private BitSet vistedIndexes = new BitSet(); + + @Override + public String toString() { + return "VistedIndex{" + + "vistedIndexes=" + vistedIndexes + + '}'; + } + + public void reset(int fieldsCount) { + this.vistedIndexes.clear(0, fieldsCount); + } + + public void markWritten(int i) { + vistedIndexes.set(i); + } + + public boolean isWritten(int i) { + return vistedIndexes.get(i); + } + } + + //track at each level of depth, which fields are written, so nulls can be inserted for the unwritten fields + private final FieldsMarker[] fieldsWritten; + private final int[] r; + private final ColumnWriter[] columnWriter; + private final ColumnWriteStore columns; + private boolean emptyField = true; + + public MessageColumnIORecordConsumer(ColumnWriteStore columns) { + this.columns = columns; + int maxDepth = 0; + this.columnWriter = new ColumnWriter[MessageColumnIO.this.getLeaves().size()]; + for (PrimitiveColumnIO primitiveColumnIO : MessageColumnIO.this.getLeaves()) { + maxDepth = Math.max(maxDepth, primitiveColumnIO.getFieldPath().length); + columnWriter[primitiveColumnIO.getId()] = columns.getColumnWriter(primitiveColumnIO.getColumnDescriptor()); + } + + fieldsWritten = new FieldsMarker[maxDepth]; + for (int i = 0; i < maxDepth; i++) { + fieldsWritten[i] = new FieldsMarker(); + } + r = new int[maxDepth]; + } + + public void printState() { + log(currentLevel + ", " + fieldsWritten[currentLevel] + ": " + Arrays.toString(currentColumnIO.getFieldPath()) + " r:" + r[currentLevel]); + if (r[currentLevel] > currentColumnIO.getRepetitionLevel()) { + // sanity check + throw new InvalidRecordException(r[currentLevel] + "(r) > " + currentColumnIO.getRepetitionLevel() + " ( schema r)"); + } + } + + private void log(Object m) { + String indent = ""; + for (int i = 0; i"); + currentColumnIO = MessageColumnIO.this; + r[0] = 0; + int numberOfFieldsToVisit = ((GroupColumnIO)currentColumnIO).getChildrenCount(); + fieldsWritten[0].reset(numberOfFieldsToVisit); + if (DEBUG) printState(); + } + + @Override + public void endMessage() { + writeNullForMissingFieldsAtCurrentLevel(); + columns.endRecord(); + if (DEBUG) log("< MESSAGE END >"); + if (DEBUG) printState(); + } + + @Override + public void startField(String field, int index) { + try { + if (DEBUG) log("startField(" + field + ", " + index + ")"); + currentColumnIO = ((GroupColumnIO)currentColumnIO).getChild(index); + emptyField = true; + if (DEBUG) printState(); + } catch (RuntimeException e) { + throw new ParquetEncodingException("error starting field " + field + " at " + index, e); + } + } + + @Override + public void endField(String field, int index) { + if (DEBUG) log("endField(" + field + ", " + index + ")"); + currentColumnIO = currentColumnIO.getParent(); + if (emptyField) { + throw new ParquetEncodingException("empty fields are illegal, the field should be ommited completely instead"); + } + fieldsWritten[currentLevel].markWritten(index); + r[currentLevel] = currentLevel == 0 ? 0 : r[currentLevel - 1]; + if (DEBUG) printState(); + } + + private void writeNullForMissingFieldsAtCurrentLevel() { + int currentFieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount(); + for (int i = 0; i < currentFieldsCount; i++) { + if (!fieldsWritten[currentLevel].isWritten(i)) { + try { + ColumnIO undefinedField = ((GroupColumnIO)currentColumnIO).getChild(i); + int d = currentColumnIO.getDefinitionLevel(); + if (DEBUG) + log(Arrays.toString(undefinedField.getFieldPath()) + ".writeNull(" + r[currentLevel] + "," + d + ")"); + writeNull(undefinedField, r[currentLevel], d); + } catch (RuntimeException e) { + throw new ParquetEncodingException("error while writing nulls for fields of indexes " + i + " . current index: " + fieldsWritten[currentLevel], e); + } + } + } + } + + private void writeNull(ColumnIO undefinedField, int r, int d) { + if (undefinedField.getType().isPrimitive()) { + columnWriter[((PrimitiveColumnIO)undefinedField).getId()].writeNull(r, d); + } else { + GroupColumnIO groupColumnIO = (GroupColumnIO)undefinedField; + int childrenCount = groupColumnIO.getChildrenCount(); + for (int i = 0; i < childrenCount; i++) { + writeNull(groupColumnIO.getChild(i), r, d); + } + } + } + + private void setRepetitionLevel() { + r[currentLevel] = currentColumnIO.getRepetitionLevel(); + if (DEBUG) log("r: " + r[currentLevel]); + } + + @Override + public void startGroup() { + if (DEBUG) log("startGroup()"); + + ++ currentLevel; + r[currentLevel] = r[currentLevel - 1]; + + int fieldsCount = ((GroupColumnIO)currentColumnIO).getChildrenCount(); + fieldsWritten[currentLevel].reset(fieldsCount); + if (DEBUG) printState(); + } + + @Override + public void endGroup() { + if (DEBUG) log("endGroup()"); + emptyField = false; + writeNullForMissingFieldsAtCurrentLevel(); + -- currentLevel; + + setRepetitionLevel(); + if (DEBUG) printState(); + } + + private ColumnWriter getColumnWriter() { + return columnWriter[((PrimitiveColumnIO)currentColumnIO).getId()]; + } + + @Override + public void addInteger(int value) { + if (DEBUG) log("addInt(" + value + ")"); + emptyField = false; + getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel()); + + setRepetitionLevel(); + if (DEBUG) printState(); + } + + @Override + public void addLong(long value) { + if (DEBUG) log("addLong(" + value + ")"); + emptyField = false; + getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel()); + + setRepetitionLevel(); + if (DEBUG) printState(); + } + + @Override + public void addBoolean(boolean value) { + if (DEBUG) log("addBoolean(" + value + ")"); + emptyField = false; + getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel()); + + setRepetitionLevel(); + if (DEBUG) printState(); + } + + @Override + public void addBinary(Binary value) { + if (DEBUG) log("addBinary(" + value.length() + " bytes)"); + emptyField = false; + getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel()); + + setRepetitionLevel(); + if (DEBUG) printState(); + } + + @Override + public void addFloat(float value) { + if (DEBUG) log("addFloat(" + value + ")"); + emptyField = false; + getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel()); + + setRepetitionLevel(); + if (DEBUG) printState(); + } + + @Override + public void addDouble(double value) { + if (DEBUG) log("addDouble(" + value + ")"); + emptyField = false; + getColumnWriter().write(value, r[currentLevel], currentColumnIO.getDefinitionLevel()); + + setRepetitionLevel(); + if (DEBUG) printState(); + } + + } + + public RecordConsumer getRecordWriter(ColumnWriteStore columns) { + RecordConsumer recordWriter = new MessageColumnIORecordConsumer(columns); + if (DEBUG) recordWriter = new RecordConsumerLoggingWrapper(recordWriter); + return validating ? new ValidatingRecordConsumer(recordWriter, getType()) : recordWriter; + } + + void setLevels() { + setLevels(0, 0, new String[0], new int[0], Arrays.asList(this), Arrays.asList(this)); + } + + void setLeaves(List leaves) { + this.leaves = leaves; + } + + public List getLeaves() { + return this.leaves; + } + + @Override + public MessageType getType() { + return (MessageType)super.getType(); + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java b/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java new file mode 100644 index 0000000..1007e32 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/ParquetDecodingException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import org.apache.parquet.ParquetRuntimeException; + +/** + * thrown when an encoding problem occured + * + * @author Julien Le Dem + * + */ +public class ParquetDecodingException extends ParquetRuntimeException { + private static final long serialVersionUID = 1L; + + public ParquetDecodingException() { + } + + public ParquetDecodingException(String message, Throwable cause) { + super(message, cause); + } + + public ParquetDecodingException(String message) { + super(message); + } + + public ParquetDecodingException(Throwable cause) { + super(cause); + } + +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b10870e4/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java ---------------------------------------------------------------------- diff --git a/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java b/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java new file mode 100644 index 0000000..05f9c56 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/ParquetEncodingException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import org.apache.parquet.ParquetRuntimeException; + +/** + * thrown when a decoding problem occured + * + * @author Julien Le Dem + * + */ +public class ParquetEncodingException extends ParquetRuntimeException { + private static final long serialVersionUID = 1L; + + public ParquetEncodingException() { + } + + public ParquetEncodingException(String message, Throwable cause) { + super(message, cause); + } + + public ParquetEncodingException(String message) { + super(message); + } + + public ParquetEncodingException(Throwable cause) { + super(cause); + } + +}