From commits-return-15540-archive-asf-public=cust-asf.ponee.io@flink.apache.org Wed Jan 31 14:47:13 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 7C5B2180662 for ; Wed, 31 Jan 2018 14:47:13 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6C056160C35; Wed, 31 Jan 2018 13:47:13 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1E059160C25 for ; Wed, 31 Jan 2018 14:47:10 +0100 (CET) Received: (qmail 55355 invoked by uid 500); 31 Jan 2018 13:47:10 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 55329 invoked by uid 99); 31 Jan 2018 13:47:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Jan 2018 13:47:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 222BAE038F; Wed, 31 Jan 2018 13:47:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: twalthr@apache.org To: commits@flink.apache.org Date: Wed, 31 Jan 2018 13:47:10 -0000 Message-Id: <5de06149783b4084a4487ff6cd0ab63d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-8230] [orc] Fix NPEs when reading nested columns. Repository: flink Updated Branches: refs/heads/release-1.4 0cc0572c4 -> 110b86dd2 http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java deleted file mode 100644 index cfb4e0e..0000000 --- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java +++ /dev/null @@ -1,1508 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.types.Row; - -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.orc.TypeDescription; - -import java.lang.reflect.Array; -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.sql.Date; -import java.sql.Timestamp; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.TimeZone; -import java.util.function.DoubleFunction; -import java.util.function.IntFunction; -import java.util.function.LongFunction; - -/** - * A class that provides utility methods for orc file reading. - */ -class OrcUtils { - - private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000 - private static final TimeZone LOCAL_TZ = TimeZone.getDefault(); - - /** - * Converts an ORC schema to a Flink TypeInformation. - * - * @param schema The ORC schema. - * @return The TypeInformation that corresponds to the ORC schema. - */ - static TypeInformation schemaToTypeInfo(TypeDescription schema) { - switch (schema.getCategory()) { - case BOOLEAN: - return BasicTypeInfo.BOOLEAN_TYPE_INFO; - case BYTE: - return BasicTypeInfo.BYTE_TYPE_INFO; - case SHORT: - return BasicTypeInfo.SHORT_TYPE_INFO; - case INT: - return BasicTypeInfo.INT_TYPE_INFO; - case LONG: - return BasicTypeInfo.LONG_TYPE_INFO; - case FLOAT: - return BasicTypeInfo.FLOAT_TYPE_INFO; - case DOUBLE: - return BasicTypeInfo.DOUBLE_TYPE_INFO; - case DECIMAL: - return BasicTypeInfo.BIG_DEC_TYPE_INFO; - case STRING: - case CHAR: - case VARCHAR: - return BasicTypeInfo.STRING_TYPE_INFO; - case DATE: - return SqlTimeTypeInfo.DATE; - case TIMESTAMP: - return SqlTimeTypeInfo.TIMESTAMP; - case BINARY: - return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; - case STRUCT: - List fieldSchemas = schema.getChildren(); - TypeInformation[] fieldTypes = new TypeInformation[fieldSchemas.size()]; - for (int i = 0; i < fieldSchemas.size(); i++) { - fieldTypes[i] = schemaToTypeInfo(fieldSchemas.get(i)); - } - String[] fieldNames = schema.getFieldNames().toArray(new String[]{}); - return new RowTypeInfo(fieldTypes, fieldNames); - case LIST: - TypeDescription elementSchema = schema.getChildren().get(0); - TypeInformation elementType = schemaToTypeInfo(elementSchema); - // arrays of primitive types are handled as object arrays to support null values - return ObjectArrayTypeInfo.getInfoFor(elementType); - case MAP: - TypeDescription keySchema = schema.getChildren().get(0); - TypeDescription valSchema = schema.getChildren().get(1); - TypeInformation keyType = schemaToTypeInfo(keySchema); - TypeInformation valType = schemaToTypeInfo(valSchema); - return new MapTypeInfo<>(keyType, valType); - case UNION: - throw new UnsupportedOperationException("UNION type is not supported yet."); - default: - throw new IllegalArgumentException("Unknown type " + schema); - } - } - - /** - * Fills an ORC batch into an array of Row. - * - * @param rows The batch of rows need to be filled. - * @param schema The schema of the ORC data. - * @param batch The ORC data. - * @param selectedFields The list of selected ORC fields. - * @return The number of rows that were filled. - */ - static int fillRows(Row[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] selectedFields) { - - int rowsToRead = Math.min((int) batch.count(), rows.length); - - List fieldTypes = schema.getChildren(); - // read each selected field - for (int rowIdx = 0; rowIdx < selectedFields.length; rowIdx++) { - int orcIdx = selectedFields[rowIdx]; - readField(rows, rowIdx, fieldTypes.get(orcIdx), batch.cols[orcIdx], null, rowsToRead); - } - return rowsToRead; - } - - /** - * Reads a vector of data into an array of objects. - * - * @param vals The array that needs to be filled. - * @param fieldIdx If the vals array is an array of Row, the index of the field that needs to be filled. - * Otherwise a -1 must be passed and the data is directly filled into the array. - * @param schema The schema of the vector to read. - * @param vector The vector to read. - * @param lengthVector If the vector is of type List or Map, the number of sub-elements to read for each field. Otherwise, it must be null. - * @param childCount The number of vector entries to read. - */ - private static void readField(Object[] vals, int fieldIdx, TypeDescription schema, ColumnVector vector, long[] lengthVector, int childCount) { - - // check the type of the vector to decide how to read it. - switch (schema.getCategory()) { - case BOOLEAN: - if (vector.noNulls) { - readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray); - } else { - readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray); - } - break; - case BYTE: - if (vector.noNulls) { - readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray); - } else { - readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray); - } - break; - case SHORT: - if (vector.noNulls) { - readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray); - } else { - readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray); - } - break; - case INT: - if (vector.noNulls) { - readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray); - } else { - readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray); - } - break; - case LONG: - if (vector.noNulls) { - readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray); - } else { - readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray); - } - break; - case FLOAT: - if (vector.noNulls) { - readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray); - } else { - readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray); - } - break; - case DOUBLE: - if (vector.noNulls) { - readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray); - } else { - readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray); - } - break; - case CHAR: - case VARCHAR: - case STRING: - if (vector.noNulls) { - readNonNullBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount); - } else { - readBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount); - } - break; - case DATE: - if (vector.noNulls) { - readNonNullLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount); - } else { - readLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount); - } - break; - case TIMESTAMP: - if (vector.noNulls) { - readNonNullTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount); - } else { - readTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount); - } - break; - case BINARY: - if (vector.noNulls) { - readNonNullBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount); - } else { - readBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount); - } - break; - case DECIMAL: - if (vector.noNulls) { - readNonNullDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount); - } else { - readDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount); - } - break; - case STRUCT: - if (vector.noNulls) { - readNonNullStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount); - } else { - readStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount); - } - break; - case LIST: - if (vector.noNulls) { - readNonNullListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount); - } else { - readListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount); - } - break; - case MAP: - if (vector.noNulls) { - readNonNullMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount); - } else { - readMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount); - } - break; - case UNION: - throw new UnsupportedOperationException("UNION type not supported yet"); - default: - throw new IllegalArgumentException("Unknown type " + schema); - } - } - - private static void readNonNullLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount, - LongFunction reader, IntFunction array) { - - // check if the values need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - T repeatingValue = reader.apply(vector.vector[0]); - fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount); - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = reader.apply(vector.vector[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, reader.apply(vector.vector[i])); - } - } - } - } else { // in a list - T[] temp; - int offset = 0; - if (vector.isRepeating) { // fill complete list with first value - T repeatingValue = reader.apply(vector.vector[0]); - for (int i = 0; offset < childCount; i++) { - temp = array.apply((int) lengthVector[i]); - Arrays.fill(temp, repeatingValue); - offset += temp.length; - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } else { - for (int i = 0; offset < childCount; i++) { - temp = array.apply((int) lengthVector[i]); - for (int j = 0; j < temp.length; j++) { - temp[j] = reader.apply(vector.vector[offset++]); - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readNonNullDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount, - DoubleFunction reader, IntFunction array) { - - // check if the values need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - T repeatingValue = reader.apply(vector.vector[0]); - fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount); - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = reader.apply(vector.vector[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, reader.apply(vector.vector[i])); - } - } - } - } else { // in a list - T[] temp; - int offset = 0; - if (vector.isRepeating) { // fill complete list with first value - T repeatingValue = reader.apply(vector.vector[0]); - for (int i = 0; offset < childCount; i++) { - temp = array.apply((int) lengthVector[i]); - Arrays.fill(temp, repeatingValue); - offset += temp.length; - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } else { - for (int i = 0; offset < childCount; i++) { - temp = array.apply((int) lengthVector[i]); - for (int j = 0; j < temp.length; j++) { - temp[j] = reader.apply(vector.vector[offset++]); - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readNonNullBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) { - // check if the values need to be read into lists or as single values - if (lengthVector == null) { - if (bytes.isRepeating) { // fill complete column with first value - String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0]); - fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount); - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8)); - } - } - } - } else { - String[] temp; - int offset = 0; - if (bytes.isRepeating) { // fill complete list with first value - String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0], StandardCharsets.UTF_8); - for (int i = 0; offset < childCount; i++) { - temp = new String[(int) lengthVector[i]]; - Arrays.fill(temp, repeatingValue); - offset += temp.length; - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } else { - for (int i = 0; offset < childCount; i++) { - temp = new String[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset], StandardCharsets.UTF_8); - offset++; - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readNonNullBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) { - // check if the values need to be read into lists or as single values - if (lengthVector == null) { - if (bytes.isRepeating) { // fill complete column with first value - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - // don't reuse repeating val to avoid object mutation - vals[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - // don't reuse repeating val to avoid object mutation - rows[i].setField(fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0])); - } - } - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i])); - } - } - } - } else { - byte[][] temp; - int offset = 0; - if (bytes.isRepeating) { // fill complete list with first value - for (int i = 0; offset < childCount; i++) { - temp = new byte[(int) lengthVector[i]][]; - for (int j = 0; j < temp.length; j++) { - temp[j] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]); - } - offset += temp.length; - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } else { - for (int i = 0; offset < childCount; i++) { - temp = new byte[(int) lengthVector[i]][]; - for (int j = 0; j < temp.length; j++) { - temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]); - offset++; - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readNonNullLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) { - - // check if the values need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - // do not reuse repeated value due to mutability of Date - vals[i] = readDate(vector.vector[0]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - // do not reuse repeated value due to mutability of Date - rows[i].setField(fieldIdx, readDate(vector.vector[0])); - } - } - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = readDate(vector.vector[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, readDate(vector.vector[i])); - } - } - } - } else { // in a list - Date[] temp; - int offset = 0; - if (vector.isRepeating) { // fill complete list with first value - for (int i = 0; offset < childCount; i++) { - temp = new Date[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - temp[j] = readDate(vector.vector[0]); - } - offset += temp.length; - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } else { - for (int i = 0; offset < childCount; i++) { - temp = new Date[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - temp[j] = readDate(vector.vector[offset++]); - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readNonNullTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) { - - // check if the timestamps need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - // do not reuse value to prevent object mutation - vals[i] = readTimestamp(vector.time[0], vector.nanos[0]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - // do not reuse value to prevent object mutation - rows[i].setField(fieldIdx, readTimestamp(vector.time[0], vector.nanos[0])); - } - } - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = readTimestamp(vector.time[i], vector.nanos[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, readTimestamp(vector.time[i], vector.nanos[i])); - } - } - } - } else { - Timestamp[] temp; - int offset = 0; - if (vector.isRepeating) { // fill complete list with first value - for (int i = 0; offset < childCount; i++) { - temp = new Timestamp[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - // do not reuse value to prevent object mutation - temp[j] = readTimestamp(vector.time[0], vector.nanos[0]); - } - offset += temp.length; - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } else { - for (int i = 0; offset < childCount; i++) { - temp = new Timestamp[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - temp[j] = readTimestamp(vector.time[offset], vector.nanos[offset]); - offset++; - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readNonNullDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) { - - // check if the decimals need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - fillColumnWithRepeatingValue(vals, fieldIdx, readBigDecimal(vector.vector[0]), childCount); - } else { - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - vals[i] = readBigDecimal(vector.vector[i]); - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i])); - } - } - } - } else { - BigDecimal[] temp; - int offset = 0; - if (vector.isRepeating) { // fill complete list with first value - BigDecimal repeatingValue = readBigDecimal(vector.vector[0]); - for (int i = 0; offset < childCount; i++) { - temp = new BigDecimal[(int) lengthVector[i]]; - Arrays.fill(temp, repeatingValue); - offset += temp.length; - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } else { - for (int i = 0; offset < childCount; i++) { - temp = new BigDecimal[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - temp[j] = readBigDecimal(vector.vector[offset++]); - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - - } - - private static void readNonNullStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, long[] lengthVector, int childCount) { - - List childrenTypes = schema.getChildren(); - - int numFields = childrenTypes.size(); - // create a batch of Rows to read the structs - Row[] structs = new Row[childCount]; - // TODO: possible improvement: reuse existing Row objects - for (int i = 0; i < childCount; i++) { - structs[i] = new Row(numFields); - } - - // read struct fields - for (int i = 0; i < numFields; i++) { - readField(structs, i, childrenTypes.get(i), structVector.fields[i], null, childCount); - } - - // check if the structs need to be read into lists or as single values - if (lengthVector == null) { - if (fieldIdx == -1) { // set struct as an object - System.arraycopy(structs, 0, vals, 0, childCount); - } else { // set struct as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, structs[i]); - } - } - } else { // struct in a list - int offset = 0; - Row[] temp; - for (int i = 0; offset < childCount; i++) { - temp = new Row[(int) lengthVector[i]]; - System.arraycopy(structs, offset, temp, 0, temp.length); - offset = offset + temp.length; - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - - private static void readNonNullListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) { - - TypeDescription fieldType = schema.getChildren().get(0); - // check if the list need to be read into lists or as single values - if (lengthVector == null) { - long[] lengthVectorNested = list.lengths; - readField(vals, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount); - } else { // list in a list - Object[] nestedLists = new Object[childCount]; - // length vector for nested list - long[] lengthVectorNested = list.lengths; - // read nested list - readField(nestedLists, -1, fieldType, list.child, lengthVectorNested, list.childCount); - // get type of nestedList - Class classType = nestedLists[0].getClass(); - - // fill outer list with nested list - int offset = 0; - int length; - for (int i = 0; offset < childCount; i++) { - length = (int) lengthVector[i]; - Object[] temp = (Object[]) Array.newInstance(classType, length); - System.arraycopy(nestedLists, offset, temp, 0, length); - offset = offset + length; - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - - private static void readNonNullMapColumn(Object[] vals, int fieldIdx, MapColumnVector mapsVector, TypeDescription schema, long[] lengthVector, int childCount) { - - List fieldType = schema.getChildren(); - TypeDescription keyType = fieldType.get(0); - TypeDescription valueType = fieldType.get(1); - - ColumnVector keys = mapsVector.keys; - ColumnVector values = mapsVector.values; - Object[] keyRows = new Object[mapsVector.childCount]; - Object[] valueRows = new Object[mapsVector.childCount]; - - // read map keys and values - readField(keyRows, -1, keyType, keys, null, keyRows.length); - readField(valueRows, -1, valueType, values, null, valueRows.length); - - // check if the maps need to be read into lists or as single values - if (lengthVector == null) { - long[] lengthVectorMap = mapsVector.lengths; - int offset = 0; - - for (int i = 0; i < childCount; i++) { - long numMapEntries = lengthVectorMap[i]; - HashMap map = readHashMap(keyRows, valueRows, offset, numMapEntries); - offset += numMapEntries; - - if (fieldIdx == -1) { - vals[i] = map; - } else { - ((Row) vals[i]).setField(fieldIdx, map); - } - } - } else { // list of map - - long[] lengthVectorMap = mapsVector.lengths; - int mapOffset = 0; // offset of map element - int offset = 0; // offset of map - HashMap[] temp; - - for (int i = 0; offset < childCount; i++) { - temp = new HashMap[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - long numMapEntries = lengthVectorMap[offset]; - temp[j] = readHashMap(keyRows, valueRows, mapOffset, numMapEntries); - mapOffset += numMapEntries; - offset++; - } - if (fieldIdx == 1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - - private static void readLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount, - LongFunction reader, IntFunction array) { - - // check if the values need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = reader.apply(vector.vector[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, reader.apply(vector.vector[i])); - } - } - } - } - } else { // in a list - if (vector.isRepeating) { // // fill complete list with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array); - } else { - // column contain null values - int offset = 0; - T[] temp; - boolean[] isNullVector = vector.isNull; - for (int i = 0; offset < childCount; i++) { - temp = array.apply((int) lengthVector[i]); - for (int j = 0; j < temp.length; j++) { - if (isNullVector[offset]) { - offset++; - } else { - temp[j] = reader.apply(vector.vector[offset++]); - } - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount, - DoubleFunction reader, IntFunction array) { - - // check if the values need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = reader.apply(vector.vector[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, reader.apply(vector.vector[i])); - } - } - } - } - } else { // in a list - if (vector.isRepeating) { // // fill complete list with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array); - } else { - // column contain null values - int offset = 0; - T[] temp; - boolean[] isNullVector = vector.isNull; - for (int i = 0; offset < childCount; i++) { - temp = array.apply((int) lengthVector[i]); - for (int j = 0; j < temp.length; j++) { - if (isNullVector[offset]) { - offset++; - } else { - temp[j] = reader.apply(vector.vector[offset++]); - } - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) { - - // check if the values need to be read into lists or as single values - if (lengthVector == null) { - if (bytes.isRepeating) { // fill complete column with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - boolean[] isNullVector = bytes.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i])); - } - } - } - } - } else { // in a list - if (bytes.isRepeating) { // fill list with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::stringArray); - } else { - int offset = 0; - String[] temp; - boolean[] isNullVector = bytes.isNull; - for (int i = 0; offset < childCount; i++) { - temp = new String[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - if (isNullVector[offset]) { - offset++; - } else { - temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset]); - offset++; - } - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) { - - // check if the binary need to be read into lists or as single values - if (lengthVector == null) { - if (bytes.isRepeating) { // fill complete column with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - boolean[] isNullVector = bytes.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i])); - } - } - } - } - } else { - if (bytes.isRepeating) { // fill complete list with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::binaryArray); - } else { - int offset = 0; - byte[][] temp; - boolean[] isNullVector = bytes.isNull; - for (int i = 0; offset < childCount; i++) { - temp = new byte[(int) lengthVector[i]][]; - for (int j = 0; j < temp.length; j++) { - if (isNullVector[offset]) { - offset++; - } else { - temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]); - offset++; - } - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) { - - // check if the values need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = readDate(vector.vector[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, readDate(vector.vector[i])); - } - } - } - } - } else { // in a list - if (vector.isRepeating) { // // fill complete list with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::dateArray); - } else { - // column contain null values - int offset = 0; - Date[] temp; - boolean[] isNullVector = vector.isNull; - for (int i = 0; offset < childCount; i++) { - temp = new Date[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - if (isNullVector[offset]) { - offset++; - } else { - temp[j] = readDate(vector.vector[offset++]); - } - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) { - - // check if the timestamps need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]); - vals[i] = ts; - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]); - rows[i].setField(fieldIdx, ts); - } - } - } - } - } else { - if (vector.isRepeating) { // fill complete list with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::timestampArray); - } else { - int offset = 0; - Timestamp[] temp; - boolean[] isNullVector = vector.isNull; - for (int i = 0; offset < childCount; i++) { - temp = new Timestamp[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - if (isNullVector[offset]) { - offset++; - } else { - temp[j] = readTimestamp(vector.time[offset], vector.nanos[offset]); - offset++; - } - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) { - - // check if the decimals need to be read into lists or as single values - if (lengthVector == null) { - if (vector.isRepeating) { // fill complete column with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount); - } else { - boolean[] isNullVector = vector.isNull; - if (fieldIdx == -1) { // set as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = readBigDecimal(vector.vector[i]); - } - } - } else { // set as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i])); - } - } - } - } - } else { - if (vector.isRepeating) { // fill complete list with first value - // since the column contains null values and has just one distinct value, the repeated value is null - fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::decimalArray); - } else { - int offset = 0; - BigDecimal[] temp; - boolean[] isNullVector = vector.isNull; - for (int i = 0; offset < childCount; i++) { - temp = new BigDecimal[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - if (isNullVector[offset]) { - offset++; - } else { - temp[j] = readBigDecimal(vector.vector[offset++]); - } - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - } - - private static void readStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, long[] lengthVector, int childCount) { - - List childrenTypes = schema.getChildren(); - - int numFields = childrenTypes.size(); - // create a batch of Rows to read the structs - Row[] structs = new Row[childCount]; - // TODO: possible improvement: reuse existing Row objects - for (int i = 0; i < childCount; i++) { - structs[i] = new Row(numFields); - } - - // read struct fields - for (int i = 0; i < numFields; i++) { - readField(structs, i, childrenTypes.get(i), structVector.fields[i], null, childCount); - } - - boolean[] isNullVector = structVector.isNull; - - // check if the structs need to be read into lists or as single values - if (lengthVector == null) { - if (fieldIdx == -1) { // set struct as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = structs[i]; - } - } - } else { // set struct as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, structs[i]); - } - } - } - } else { // struct in a list - int offset = 0; - Row[] temp; - for (int i = 0; offset < childCount; i++) { - temp = new Row[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - if (isNullVector[offset]) { - temp[j] = null; - } else { - temp[j] = structs[offset++]; - } - } - if (fieldIdx == -1) { // set list of structs as an object - vals[i] = temp; - } else { // set list of structs as field of row - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - - private static void readListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) { - - TypeDescription fieldType = schema.getChildren().get(0); - // check if the lists need to be read into lists or as single values - if (lengthVector == null) { - long[] lengthVectorNested = list.lengths; - readField(vals, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount); - } else { // list in a list - Object[] nestedList = new Object[childCount]; - // length vector for nested list - long[] lengthVectorNested = list.lengths; - // read nested list - readField(nestedList, -1, fieldType, list.child, lengthVectorNested, list.childCount); - - // fill outer list with nested list - int offset = 0; - int length; - // get type of nestedList - Class classType = nestedList[0].getClass(); - for (int i = 0; offset < childCount; i++) { - length = (int) lengthVector[i]; - Object[] temp = (Object[]) Array.newInstance(classType, length); - System.arraycopy(nestedList, offset, temp, 0, length); - offset = offset + length; - if (fieldIdx == -1) { // set list of list as an object - vals[i] = temp; - } else { // set list of list as field of row - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - - private static void readMapColumn(Object[] vals, int fieldIdx, MapColumnVector map, TypeDescription schema, long[] lengthVector, int childCount) { - - List fieldType = schema.getChildren(); - TypeDescription keyType = fieldType.get(0); - TypeDescription valueType = fieldType.get(1); - - ColumnVector keys = map.keys; - ColumnVector values = map.values; - Object[] keyRows = new Object[map.childCount]; - Object[] valueRows = new Object[map.childCount]; - - // read map kes and values - readField(keyRows, -1, keyType, keys, null, keyRows.length); - readField(valueRows, -1, valueType, values, null, valueRows.length); - - boolean[] isNullVector = map.isNull; - - // check if the maps need to be read into lists or as single values - if (lengthVector == null) { - long[] lengthVectorMap = map.lengths; - int offset = 0; - if (fieldIdx == -1) { // set map as an object - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - vals[i] = null; - } else { - vals[i] = readHashMap(keyRows, valueRows, offset, lengthVectorMap[i]); - offset += lengthVectorMap[i]; - } - } - } else { // set map as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - if (isNullVector[i]) { - rows[i].setField(fieldIdx, null); - } else { - rows[i].setField(fieldIdx, readHashMap(keyRows, valueRows, offset, lengthVectorMap[i])); - offset += lengthVectorMap[i]; - } - } - } - } else { // list of map - long[] lengthVectorMap = map.lengths; - int mapOffset = 0; // offset of map element - int offset = 0; // offset of map - HashMap[] temp; - - for (int i = 0; offset < childCount; i++) { - temp = new HashMap[(int) lengthVector[i]]; - for (int j = 0; j < temp.length; j++) { - if (isNullVector[offset]) { - temp[j] = null; - } else { - temp[j] = readHashMap(keyRows, valueRows, mapOffset, lengthVectorMap[offset]); - mapOffset += lengthVectorMap[offset]; - offset++; - } - } - if (fieldIdx == -1) { - vals[i] = temp; - } else { - ((Row) vals[i]).setField(fieldIdx, temp); - } - } - } - } - - /** - * Sets a repeating value to all objects or row fields of the passed vals array. - * - * @param vals The array of objects or Rows. - * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled. - * Otherwise a -1 must be passed and the data is directly filled into the array. - * @param repeatingValue The value that is set. - * @param childCount The number of times the value is set. - */ - private static void fillColumnWithRepeatingValue(Object[] vals, int fieldIdx, Object repeatingValue, int childCount) { - - if (fieldIdx == -1) { - // set value as an object - Arrays.fill(vals, 0, childCount, repeatingValue); - } else { - // set value as a field of Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, repeatingValue); - } - } - } - - /** - * Sets arrays containing only null values to all objects or row fields of the passed vals array. - * - * @param vals The array of objects or Rows to which the empty arrays are set. - * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled. - * Otherwise a -1 must be passed and the data is directly filled into the array. - * @param lengthVector The vector containing the lengths of the individual empty arrays. - * @param childCount The number of objects or Rows to fill. - * @param array A method to create arrays of the appropriate type. - * @param The type of the arrays to create. - */ - private static void fillListWithRepeatingNull(Object[] vals, int fieldIdx, long[] lengthVector, int childCount, IntFunction array) { - - if (fieldIdx == -1) { - // set empty array as object - for (int i = 0; i < childCount; i++) { - vals[i] = array.apply((int) lengthVector[i]); - } - } else { - // set empty array as field in Row - Row[] rows = (Row[]) vals; - for (int i = 0; i < childCount; i++) { - rows[i].setField(fieldIdx, array.apply((int) lengthVector[i])); - } - } - } - - private static Boolean readBoolean(long l) { - return l != 0; - } - - private static Byte readByte(long l) { - return (byte) l; - } - - private static Short readShort(long l) { - return (short) l; - } - - private static Integer readInt(long l) { - return (int) l; - } - - private static Long readLong(long l) { - return l; - } - - private static Float readFloat(double d) { - return (float) d; - } - - private static Double readDouble(double d) { - return d; - } - - private static Date readDate(long l) { - // day to milliseconds - final long t = l * MILLIS_PER_DAY; - // adjust by local timezone - return new java.sql.Date(t - LOCAL_TZ.getOffset(t)); - } - - private static byte[] readBinary(byte[] src, int srcPos, int length) { - byte[] result = new byte[length]; - System.arraycopy(src, srcPos, result, 0, length); - return result; - } - - private static BigDecimal readBigDecimal(HiveDecimalWritable hiveDecimalWritable) { - HiveDecimal hiveDecimal = hiveDecimalWritable.getHiveDecimal(); - return hiveDecimal.bigDecimalValue(); - } - - private static Timestamp readTimestamp(long time, int nanos) { - Timestamp ts = new Timestamp(time); - ts.setNanos(nanos); - return ts; - } - - private static HashMap readHashMap(Object[] keyRows, Object[] valueRows, int offset, long length) { - HashMap resultMap = new HashMap<>(); - for (int j = 0; j < length; j++) { - resultMap.put(keyRows[offset], valueRows[offset]); - offset++; - } - return resultMap; - } - - private static Boolean[] boolArray(int len) { - return new Boolean[len]; - } - - private static Byte[] byteArray(int len) { - return new Byte[len]; - } - - private static Short[] shortArray(int len) { - return new Short[len]; - } - - private static Integer[] intArray(int len) { - return new Integer[len]; - } - - private static Long[] longArray(int len) { - return new Long[len]; - } - - private static Float[] floatArray(int len) { - return new Float[len]; - } - - private static Double[] doubleArray(int len) { - return new Double[len]; - } - - private static Date[] dateArray(int len) { - return new Date[len]; - } - - private static byte[][] binaryArray(int len) { - return new byte[len][]; - } - - private static String[] stringArray(int len) { - return new String[len]; - } - - private static BigDecimal[] decimalArray(int len) { - return new BigDecimal[len]; - } - - private static Timestamp[] timestampArray(int len) { - return new Timestamp[len]; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java new file mode 100644 index 0000000..b90313e --- /dev/null +++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.orc.TypeDescription; +import org.junit.Assert; +import org.junit.Test; + +/** + * Unit tests for {@link OrcBatchReader}. + * + */ +public class OrcBatchReaderTest { + + @Test + public void testFlatSchemaToTypeInfo1() { + + String schema = + "struct<" + + "boolean1:boolean," + + "byte1:tinyint," + + "short1:smallint," + + "int1:int," + + "long1:bigint," + + "float1:float," + + "double1:double," + + "bytes1:binary," + + "string1:string," + + "date1:date," + + "timestamp1:timestamp," + + "decimal1:decimal(5,2)" + + ">"; + TypeInformation typeInfo = OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema)); + + Assert.assertNotNull(typeInfo); + Assert.assertTrue(typeInfo instanceof RowTypeInfo); + RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; + + // validate field types + Assert.assertArrayEquals( + new TypeInformation[]{ + Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE, + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING, + Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO + }, + rowTypeInfo.getFieldTypes()); + + // validate field names + Assert.assertArrayEquals( + new String[] { + "boolean1", "byte1", "short1", "int1", "long1", "float1", "double1", + "bytes1", "string1", "date1", "timestamp1", "decimal1" + }, + rowTypeInfo.getFieldNames()); + + } + + @Test + public void testNestedSchemaToTypeInfo1() { + + String schema = + "struct<" + + "middle:struct<" + + "list:array<" + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">" + + ">," + + "list:array<" + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">," + + "map:map<" + + "string," + + "struct<" + + "int1:int," + + "string1:string" + + ">" + + ">" + + ">"; + TypeInformation typeInfo = OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema)); + + Assert.assertNotNull(typeInfo); + Assert.assertTrue(typeInfo instanceof RowTypeInfo); + RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; + + // validate field types + Assert.assertArrayEquals( + new TypeInformation[]{ + Types.ROW_NAMED( + new String[]{"list"}, + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED( + new String[]{"int1", "string1"}, + Types.INT, Types.STRING + ) + ) + ), + ObjectArrayTypeInfo.getInfoFor( + Types.ROW_NAMED( + new String[]{"int1", "string1"}, + Types.INT, Types.STRING + ) + ), + new MapTypeInfo<>( + Types.STRING, + Types.ROW_NAMED( + new String[]{"int1", "string1"}, + Types.INT, Types.STRING + ) + ) + }, + rowTypeInfo.getFieldTypes()); + + // validate field names + Assert.assertArrayEquals( + new String[] {"middle", "list", "map"}, + rowTypeInfo.getFieldNames()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java index 0efe41f..2eb3231 100644 --- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java +++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; +import org.apache.flink.orc.util.OrcTestFileGenerator; import org.apache.flink.types.Row; import org.apache.flink.util.InstantiationUtil; @@ -50,6 +51,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.any; @@ -124,6 +126,32 @@ public class OrcRowInputFormatTest { private static final String TEST_FILE_NESTEDLIST = "test-data-nestedlist.orc"; private static final String TEST_SCHEMA_NESTEDLIST = "struct>>>"; + /** Generated by {@link OrcTestFileGenerator#writeCompositeTypesWithNullsFile(String)}. */ + private static final String TEST_FILE_COMPOSITES_NULLS = "test-data-composites-with-nulls.orc"; + private static final String TEST_SCHEMA_COMPOSITES_NULLS = + "struct<" + + "int1:int," + + "record1:struct," + + "list1:array>>>," + + "list2:array>" + + ">"; + + /** Generated by {@link OrcTestFileGenerator#writeCompositeTypesWithRepeatingFile(String)}. */ + private static final String TEST_FILE_REPEATING = "test-data-repeating.orc"; + private static final String TEST_SCHEMA_REPEATING = + "struct<" + + "int1:int," + + "int2:int," + + "int3:int," + + "record1:struct," + + "record2:struct," + + "list1:array," + + "list2:array," + + "list3:array," + + "map1:map," + + "map2:map" + + ">"; + @Test(expected = FileNotFoundException.class) public void testInvalidPath() throws IOException{ rowOrcInputFormat = @@ -477,7 +505,7 @@ public class OrcRowInputFormatTest { } @Test - public void testReadNestedFile() throws IOException{ + public void testReadNestedFile() throws IOException { rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); @@ -563,7 +591,7 @@ public class OrcRowInputFormatTest { } @Test - public void testReadTimeTypeFile() throws IOException{ + public void testReadTimeTypeFile() throws IOException { rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration()); FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); @@ -590,7 +618,7 @@ public class OrcRowInputFormatTest { } @Test - public void testReadDecimalTypeFile() throws IOException{ + public void testReadDecimalTypeFile() throws IOException { rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration()); FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); @@ -653,7 +681,183 @@ public class OrcRowInputFormatTest { } @Test - public void testReadWithProjection() throws IOException{ + public void testReadCompositesNullsFile() throws Exception { + rowOrcInputFormat = new OrcRowInputFormat( + getPath(TEST_FILE_COMPOSITES_NULLS), + TEST_SCHEMA_COMPOSITES_NULLS, + new Configuration()); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + rowOrcInputFormat.openInputFormat(); + rowOrcInputFormat.open(splits[0]); + + assertFalse(rowOrcInputFormat.reachedEnd()); + + Row row = null; + long cnt = 0; + + int structNullCnt = 0; + int nestedListNullCnt = 0; + int mapListNullCnt = 0; + + // read all rows + while (!rowOrcInputFormat.reachedEnd()) { + + row = rowOrcInputFormat.nextRecord(row); + assertEquals(4, row.getArity()); + + assertTrue(row.getField(0) instanceof Integer); + + if (row.getField(1) == null) { + structNullCnt++; + } else { + Object f = row.getField(1); + assertTrue(f instanceof Row); + assertEquals(2, ((Row) f).getArity()); + } + + if (row.getField(2) == null) { + nestedListNullCnt++; + } else { + Object f = row.getField(2); + assertTrue(f instanceof Row[][][]); + assertEquals(4, ((Row[][][]) f).length); + } + + if (row.getField(3) == null) { + mapListNullCnt++; + } else { + Object f = row.getField(3); + assertTrue(f instanceof HashMap[]); + assertEquals(3, ((HashMap[]) f).length); + } + cnt++; + } + // number of rows in file + assertEquals(2500, cnt); + // check number of null fields + assertEquals(1250, structNullCnt); + assertEquals(835, nestedListNullCnt); + assertEquals(835, mapListNullCnt); + } + + @SuppressWarnings("unchecked") + @Test + public void testReadRepeatingValuesFile() throws IOException { + rowOrcInputFormat = new OrcRowInputFormat( + getPath(TEST_FILE_REPEATING), + TEST_SCHEMA_REPEATING, + new Configuration()); + + FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1); + assertEquals(1, splits.length); + rowOrcInputFormat.openInputFormat(); + rowOrcInputFormat.open(splits[0]); + + assertFalse(rowOrcInputFormat.reachedEnd()); + + Row row = null; + long cnt = 0; + + Row firstRow1 = null; + Integer[] firstList1 = null; + HashMap firstMap1 = null; + + // read all rows + while (!rowOrcInputFormat.reachedEnd()) { + + cnt++; + row = rowOrcInputFormat.nextRecord(row); + assertEquals(10, row.getArity()); + + // check first int field (always 42) + assertNotNull(row.getField(0)); + assertTrue(row.getField(0) instanceof Integer); + assertEquals(42, ((Integer) row.getField(0)).intValue()); + + // check second int field (always null) + assertNull(row.getField(1)); + + // check first int field (always 99) + assertNotNull(row.getField(2)); + assertTrue(row.getField(2) instanceof Integer); + assertEquals(99, ((Integer) row.getField(2)).intValue()); + + // check first row field (always (23, null)) + assertNotNull(row.getField(3)); + assertTrue(row.getField(3) instanceof Row); + Row nestedRow = (Row) row.getField(3); + // check first field of nested row + assertNotNull(nestedRow.getField(0)); + assertTrue(nestedRow.getField(0) instanceof Integer); + assertEquals(23, ((Integer) nestedRow.getField(0)).intValue()); + // check second field of nested row + assertNull(nestedRow.getField(1)); + // validate reference + if (firstRow1 == null) { + firstRow1 = nestedRow; + } else { + // repeated rows must be different instances + assertTrue(firstRow1 != nestedRow); + } + + // check second row field (always null) + assertNull(row.getField(4)); + + // check first list field (always [1, 2, 3]) + assertNotNull(row.getField(5)); + assertTrue(row.getField(5) instanceof Integer[]); + Integer[] list1 = ((Integer[]) row.getField(5)); + assertEquals(1, list1[0].intValue()); + assertEquals(2, list1[1].intValue()); + assertEquals(3, list1[2].intValue()); + // validate reference + if (firstList1 == null) { + firstList1 = list1; + } else { + // repeated list must be different instances + assertTrue(firstList1 != list1); + } + + // check second list field (always [7, 7, 7]) + assertNotNull(row.getField(6)); + assertTrue(row.getField(6) instanceof Integer[]); + Integer[] list2 = ((Integer[]) row.getField(6)); + assertEquals(7, list2[0].intValue()); + assertEquals(7, list2[1].intValue()); + assertEquals(7, list2[2].intValue()); + + // check third list field (always null) + assertNull(row.getField(7)); + + // check first map field (always {2->"Hello", 4->"Hello}) + assertNotNull(row.getField(8)); + assertTrue(row.getField(8) instanceof HashMap); + HashMap map = (HashMap) row.getField(8); + assertEquals(2, map.size()); + assertEquals("Hello", map.get(2)); + assertEquals("Hello", map.get(4)); + // validate reference + if (firstMap1 == null) { + firstMap1 = map; + } else { + // repeated list must be different instances + assertTrue(firstMap1 != map); + } + + // check second map field (always null) + assertNull(row.getField(9)); + } + + rowOrcInputFormat.close(); + rowOrcInputFormat.closeInputFormat(); + + assertEquals(256, cnt); + } + + @Test + public void testReadWithProjection() throws IOException { rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration()); rowOrcInputFormat.selectFields(7, 0, 10, 8); @@ -691,7 +895,7 @@ public class OrcRowInputFormatTest { } @Test - public void testReadFileInSplits() throws IOException{ + public void testReadFileInSplits() throws IOException { rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); rowOrcInputFormat.selectFields(0, 1); @@ -717,7 +921,7 @@ public class OrcRowInputFormatTest { } @Test - public void testReadFileWithFilter() throws IOException{ + public void testReadFileWithFilter() throws IOException { rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration()); rowOrcInputFormat.selectFields(0, 1); @@ -751,7 +955,7 @@ public class OrcRowInputFormatTest { } @Test - public void testReadFileWithEvolvedSchema() throws IOException{ + public void testReadFileWithEvolvedSchema() throws IOException { rowOrcInputFormat = new OrcRowInputFormat( getPath(TEST_FILE_FLAT), http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java deleted file mode 100644 index 2cb1715..0000000 --- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.orc; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; -import org.apache.flink.api.java.typeutils.RowTypeInfo; - -import org.apache.orc.TypeDescription; -import org.junit.Assert; -import org.junit.Test; - -/** - * Unit tests for {@link OrcUtils}. - * - */ -public class OrcUtilsTest { - - @Test - public void testFlatSchemaToTypeInfo1() { - - String schema = - "struct<" + - "boolean1:boolean," + - "byte1:tinyint," + - "short1:smallint," + - "int1:int," + - "long1:bigint," + - "float1:float," + - "double1:double," + - "bytes1:binary," + - "string1:string," + - "date1:date," + - "timestamp1:timestamp," + - "decimal1:decimal(5,2)" + - ">"; - TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema)); - - Assert.assertNotNull(typeInfo); - Assert.assertTrue(typeInfo instanceof RowTypeInfo); - RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; - - // validate field types - Assert.assertArrayEquals( - new TypeInformation[]{ - Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE, - PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING, - Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO - }, - rowTypeInfo.getFieldTypes()); - - // validate field names - Assert.assertArrayEquals( - new String[] { - "boolean1", "byte1", "short1", "int1", "long1", "float1", "double1", - "bytes1", "string1", "date1", "timestamp1", "decimal1" - }, - rowTypeInfo.getFieldNames()); - - } - - @Test - public void testNestedSchemaToTypeInfo1() { - - String schema = - "struct<" + - "middle:struct<" + - "list:array<" + - "struct<" + - "int1:int," + - "string1:string" + - ">" + - ">" + - ">," + - "list:array<" + - "struct<" + - "int1:int," + - "string1:string" + - ">" + - ">," + - "map:map<" + - "string," + - "struct<" + - "int1:int," + - "string1:string" + - ">" + - ">" + - ">"; - TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema)); - - Assert.assertNotNull(typeInfo); - Assert.assertTrue(typeInfo instanceof RowTypeInfo); - RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; - - // validate field types - Assert.assertArrayEquals( - new TypeInformation[]{ - Types.ROW_NAMED( - new String[]{"list"}, - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[]{"int1", "string1"}, - Types.INT, Types.STRING - ) - ) - ), - ObjectArrayTypeInfo.getInfoFor( - Types.ROW_NAMED( - new String[]{"int1", "string1"}, - Types.INT, Types.STRING - ) - ), - new MapTypeInfo<>( - Types.STRING, - Types.ROW_NAMED( - new String[]{"int1", "string1"}, - Types.INT, Types.STRING - ) - ) - }, - rowTypeInfo.getFieldTypes()); - - // validate field names - Assert.assertArrayEquals( - new String[] {"middle", "list", "map"}, - rowTypeInfo.getFieldNames()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java new file mode 100644 index 0000000..9d3be63 --- /dev/null +++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.orc.util; + +import org.apache.flink.orc.OrcRowInputFormatTest; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * A generator for ORC test files. + */ +public class OrcTestFileGenerator { + + public static void main(String[] args) throws IOException { + writeCompositeTypesWithNullsFile(args[0]); +// writeCompositeTypesWithRepeatingFile(args[0]); + } + + /** + * Writes an ORC file with nested composite types and null values on different levels. + * Generates {@link OrcRowInputFormatTest#TEST_FILE_COMPOSITES_NULLS}. + */ + private static void writeCompositeTypesWithNullsFile(String path) throws IOException { + + Path filePath = new Path(path); + Configuration conf = new Configuration(); + + TypeDescription schema = + TypeDescription.fromString( + "struct<" + + "int1:int," + + "record1:struct," + + "list1:array>>>," + + "list2:array>" + + ">"); + + Writer writer = + OrcFile.createWriter(filePath, + OrcFile.writerOptions(conf).setSchema(schema)); + + VectorizedRowBatch batch = schema.createRowBatch(); + LongColumnVector int1 = (LongColumnVector) batch.cols[0]; + + StructColumnVector record1 = (StructColumnVector) batch.cols[1]; + LongColumnVector record1F1 = (LongColumnVector) record1.fields[0]; + BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1]; + + ListColumnVector list1 = (ListColumnVector) batch.cols[2]; + ListColumnVector nestedList = (ListColumnVector) list1.child; + ListColumnVector nestedList2 = (ListColumnVector) nestedList.child; + StructColumnVector listEntries = (StructColumnVector) nestedList2.child; + BytesColumnVector entryField1 = (BytesColumnVector) listEntries.fields[0]; + BytesColumnVector entryField2 = (BytesColumnVector) listEntries.fields[1]; + + ListColumnVector list2 = (ListColumnVector) batch.cols[3]; + MapColumnVector map1 = (MapColumnVector) list2.child; + BytesColumnVector keys = (BytesColumnVector) map1.keys; + LongColumnVector vals = (LongColumnVector) map1.values; + + final int list1Size = 4; + final int nestedListSize = 3; + final int nestedList2Size = 2; + final int list2Size = 3; + final int mapSize = 3; + + final int batchSize = batch.getMaxSize(); + + // Ensure the vectors have sufficient capacity + nestedList.ensureSize(batchSize * list1Size, false); + nestedList2.ensureSize(batchSize * list1Size * nestedListSize, false); + listEntries.ensureSize(batchSize * list1Size * nestedListSize * nestedList2Size, false); + map1.ensureSize(batchSize * list2Size, false); + keys.ensureSize(batchSize * list2Size * mapSize, false); + vals.ensureSize(batchSize * list2Size * mapSize, false); + + // add 2500 rows to file + for (int r = 0; r < 2500; ++r) { + int row = batch.size++; + + // mark nullable fields + list1.noNulls = false; + nestedList.noNulls = false; + listEntries.noNulls = false; + entryField1.noNulls = false; + record1.noNulls = false; + record1F2.noNulls = false; + list2.noNulls = false; + map1.noNulls = false; + keys.noNulls = false; + vals.noNulls = false; + + // first field: int1 + int1.vector[row] = r; + + // second field: struct + if (row % 2 != 0) { + // in every second row, the struct is null + record1F1.vector[row] = row; + if (row % 5 != 0) { + // in every fifth row, the second field of the struct is null + record1F2.setVal(row, ("f2-" + row).getBytes(StandardCharsets.UTF_8)); + } else { + record1F2.isNull[row] = true; + } + } else { + record1.isNull[row] = true; + } + + // third field: deeply nested list + if (row % 3 != 0) { + // in every third row, the nested list is null + list1.offsets[row] = list1.childCount; + list1.lengths[row] = list1Size; + list1.childCount += list1Size; + + for (int i = 0; i < list1Size; i++) { + + int listOffset = (int) list1.offsets[row] + i; + if (i != 2) { + // second nested list is always null + nestedList.offsets[listOffset] = nestedList.childCount; + nestedList.lengths[listOffset] = nestedListSize; + nestedList.childCount += nestedListSize; + + for (int j = 0; j < nestedListSize; j++) { + int nestedOffset = (int) nestedList.offsets[listOffset] + j; + nestedList2.offsets[nestedOffset] = nestedList2.childCount; + nestedList2.lengths[nestedOffset] = nestedList2Size; + nestedList2.childCount += nestedList2Size; + + for (int k = 0; k < nestedList2Size; k++) { + int nestedOffset2 = (int) nestedList2.offsets[nestedOffset] + k; + // list entries + if (k != 1) { + // second struct is always null + if (k != 0) { + // first struct field in first struct is always null + entryField1.setVal(nestedOffset2, ("f1-" + k).getBytes(StandardCharsets.UTF_8)); + } else { + entryField1.isNull[nestedOffset2] = true; + } + entryField2.setVal(nestedOffset2, ("f2-" + k).getBytes(StandardCharsets.UTF_8)); + } else { + listEntries.isNull[nestedOffset2] = true; + } + } + } + } else { + nestedList.isNull[listOffset] = true; + } + } + } else { + list1.isNull[row] = true; + } + + // forth field: map in list + if (row % 3 != 0) { + // in every third row, the map list is null + list2.offsets[row] = list2.childCount; + list2.lengths[row] = list2Size; + list2.childCount += list2Size; + + for (int i = 0; i < list2Size; i++) { + int mapOffset = (int) list2.offsets[row] + i; + + if (i != 2) { + // second map list entry is always null + map1.offsets[mapOffset] = map1.childCount; + map1.lengths[mapOffset] = mapSize; + map1.childCount += mapSize; + + for (int j = 0; j < mapSize; j++) { + int mapEntryOffset = (int) map1.offsets[mapOffset] + j; + + if (j != 1) { + // key in second map entry is always null + keys.setVal(mapEntryOffset, ("key-" + row + "-" + j).getBytes(StandardCharsets.UTF_8)); + } else { + keys.isNull[mapEntryOffset] = true; + } + if (j != 2) { + // value in third map entry is always null + vals.vector[mapEntryOffset] = row + i + j; + } else { + vals.isNull[mapEntryOffset] = true; + } + } + } else { + map1.isNull[mapOffset] = true; + } + } + } else { + list2.isNull[row] = true; + } + + if (row == batchSize - 1) { + writer.addRowBatch(batch); + batch.reset(); + } + } + if (batch.size != 0) { + writer.addRowBatch(batch); + batch.reset(); + } + writer.close(); + } + + /** + * Writes an ORC file with nested composite types and repeated values. + * Generates {@link OrcRowInputFormatTest#TEST_FILE_REPEATING}. + */ + private static void writeCompositeTypesWithRepeatingFile(String path) throws IOException { + + Path filePath = new Path(path); + Configuration conf = new Configuration(); + + TypeDescription schema = + TypeDescription.fromString( + "struct<" + + "int1:int," + + "int2:int," + + "int3:int," + + "record1:struct," + + "record2:struct," + + "list1:array," + + "list2:array," + + "list3:array," + + "map1:map," + + "map2:map" + + ">"); + + Writer writer = + OrcFile.createWriter(filePath, + OrcFile.writerOptions(conf).setSchema(schema)); + + VectorizedRowBatch batch = schema.createRowBatch(); + + LongColumnVector int1 = (LongColumnVector) batch.cols[0]; + LongColumnVector int2 = (LongColumnVector) batch.cols[1]; + LongColumnVector int3 = (LongColumnVector) batch.cols[2]; + + StructColumnVector record1 = (StructColumnVector) batch.cols[3]; + LongColumnVector record1F1 = (LongColumnVector) record1.fields[0]; + BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1]; + StructColumnVector record2 = (StructColumnVector) batch.cols[4]; + + ListColumnVector list1 = (ListColumnVector) batch.cols[5]; + LongColumnVector list1int = (LongColumnVector) list1.child; + ListColumnVector list2 = (ListColumnVector) batch.cols[6]; + LongColumnVector list2int = (LongColumnVector) list2.child; + ListColumnVector list3 = (ListColumnVector) batch.cols[7]; + + MapColumnVector map1 = (MapColumnVector) batch.cols[8]; + LongColumnVector map1keys = (LongColumnVector) map1.keys; + BytesColumnVector map1vals = (BytesColumnVector) map1.values; + MapColumnVector map2 = (MapColumnVector) batch.cols[9]; + + final int listSize = 3; + final int mapSize = 2; + + final int batchSize = batch.getMaxSize(); + + // Ensure the vectors have sufficient capacity + list1int.ensureSize(batchSize * listSize, false); + list2int.ensureSize(batchSize * listSize, false); + map1keys.ensureSize(batchSize * mapSize, false); + map1vals.ensureSize(batchSize * mapSize, false); + + // int1: all values are 42 + int1.noNulls = true; + int1.setRepeating(true); + int1.vector[0] = 42; + + // int2: all values are null + int2.noNulls = false; + int2.setRepeating(true); + int2.isNull[0] = true; + + // int3: all values are 99 + int3.noNulls = false; + int3.setRepeating(true); + int3.isNull[0] = false; + int3.vector[0] = 99; + + // record1: all records are [23, "Hello"] + record1.noNulls = true; + record1.setRepeating(true); + for (int i = 0; i < batchSize; i++) { + record1F1.vector[i] = i + 23; + } + record1F2.noNulls = false; + record1F2.isNull[0] = true; + + // record2: all records are null + record2.noNulls = false; + record2.setRepeating(true); + record2.isNull[0] = true; + + // list1: all lists are [1, 2, 3] + list1.noNulls = true; + list1.setRepeating(true); + list1.lengths[0] = listSize; + list1.offsets[0] = 1; + for (int i = 0; i < batchSize * listSize; i++) { + list1int.vector[i] = i; + } + + // list2: all lists are [7, 7, 7] + list2.noNulls = true; + list2.setRepeating(true); + list2.lengths[0] = listSize; + list2.offsets[0] = 0; + list2int.setRepeating(true); + list2int.vector[0] = 7; + + // list3: all lists are null + list3.noNulls = false; + list3.setRepeating(true); + list3.isNull[0] = true; + + // map1: all maps are [2 -> "HELLO", 4 -> "HELLO"] + map1.noNulls = true; + map1.setRepeating(true); + map1.lengths[0] = mapSize; + map1.offsets[0] = 1; + for (int i = 0; i < batchSize * mapSize; i++) { + map1keys.vector[i] = i * 2; + } + map1vals.setRepeating(true); + map1vals.setVal(0, "Hello".getBytes(StandardCharsets.UTF_8)); + + // map2: all maps are null + map2.noNulls = false; + map2.setRepeating(true); + map2.isNull[0] = true; + + batch.size = 256; + + writer.addRowBatch(batch); + batch.reset(); + writer.close(); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc b/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc new file mode 100644 index 0000000..eed1c55 Binary files /dev/null and b/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc differ http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc b/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc new file mode 100644 index 0000000..ff2c917 Binary files /dev/null and b/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc differ