flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [1/2] flink git commit: [FLINK-8230] [orc] Fix NPEs when reading nested columns.
Date Wed, 31 Jan 2018 13:47:10 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.4 0cc0572c4 -> 110b86dd2


http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java b/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
deleted file mode 100644
index cfb4e0e..0000000
--- a/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcUtils.java
+++ /dev/null
@@ -1,1508 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.types.Row;
-
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.orc.TypeDescription;
-
-import java.lang.reflect.Array;
-import java.math.BigDecimal;
-import java.nio.charset.StandardCharsets;
-import java.sql.Date;
-import java.sql.Timestamp;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.function.DoubleFunction;
-import java.util.function.IntFunction;
-import java.util.function.LongFunction;
-
-/**
- * A class that provides utility methods for orc file reading.
- */
-class OrcUtils {
-
-	private static final long MILLIS_PER_DAY = 86400000; // = 24 * 60 * 60 * 1000
-	private static final TimeZone LOCAL_TZ = TimeZone.getDefault();
-
-	/**
-	 * Converts an ORC schema to a Flink TypeInformation.
-	 *
-	 * @param schema The ORC schema.
-	 * @return The TypeInformation that corresponds to the ORC schema.
-	 */
-	static TypeInformation schemaToTypeInfo(TypeDescription schema) {
-		switch (schema.getCategory()) {
-			case BOOLEAN:
-				return BasicTypeInfo.BOOLEAN_TYPE_INFO;
-			case BYTE:
-				return BasicTypeInfo.BYTE_TYPE_INFO;
-			case SHORT:
-				return BasicTypeInfo.SHORT_TYPE_INFO;
-			case INT:
-				return BasicTypeInfo.INT_TYPE_INFO;
-			case LONG:
-				return BasicTypeInfo.LONG_TYPE_INFO;
-			case FLOAT:
-				return BasicTypeInfo.FLOAT_TYPE_INFO;
-			case DOUBLE:
-				return BasicTypeInfo.DOUBLE_TYPE_INFO;
-			case DECIMAL:
-				return BasicTypeInfo.BIG_DEC_TYPE_INFO;
-			case STRING:
-			case CHAR:
-			case VARCHAR:
-				return BasicTypeInfo.STRING_TYPE_INFO;
-			case DATE:
-				return SqlTimeTypeInfo.DATE;
-			case TIMESTAMP:
-				return SqlTimeTypeInfo.TIMESTAMP;
-			case BINARY:
-				return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
-			case STRUCT:
-				List<TypeDescription> fieldSchemas = schema.getChildren();
-				TypeInformation[] fieldTypes = new TypeInformation[fieldSchemas.size()];
-				for (int i = 0; i < fieldSchemas.size(); i++) {
-					fieldTypes[i] = schemaToTypeInfo(fieldSchemas.get(i));
-				}
-				String[] fieldNames = schema.getFieldNames().toArray(new String[]{});
-				return new RowTypeInfo(fieldTypes, fieldNames);
-			case LIST:
-				TypeDescription elementSchema = schema.getChildren().get(0);
-				TypeInformation<?> elementType = schemaToTypeInfo(elementSchema);
-				// arrays of primitive types are handled as object arrays to support null values
-				return ObjectArrayTypeInfo.getInfoFor(elementType);
-			case MAP:
-				TypeDescription keySchema = schema.getChildren().get(0);
-				TypeDescription valSchema = schema.getChildren().get(1);
-				TypeInformation<?> keyType = schemaToTypeInfo(keySchema);
-				TypeInformation<?> valType = schemaToTypeInfo(valSchema);
-				return new MapTypeInfo<>(keyType, valType);
-			case UNION:
-				throw new UnsupportedOperationException("UNION type is not supported yet.");
-			default:
-				throw new IllegalArgumentException("Unknown type " + schema);
-		}
-	}
-
-	/**
-	 * Fills an ORC batch into an array of Row.
-	 *
-	 * @param rows The batch of rows need to be filled.
-	 * @param schema The schema of the ORC data.
-	 * @param batch The ORC data.
-	 * @param selectedFields The list of selected ORC fields.
-	 * @return The number of rows that were filled.
-	 */
-	static int fillRows(Row[] rows, TypeDescription schema, VectorizedRowBatch batch, int[] selectedFields) {
-
-		int rowsToRead = Math.min((int) batch.count(), rows.length);
-
-		List<TypeDescription> fieldTypes = schema.getChildren();
-		// read each selected field
-		for (int rowIdx = 0; rowIdx < selectedFields.length; rowIdx++) {
-			int orcIdx = selectedFields[rowIdx];
-			readField(rows, rowIdx, fieldTypes.get(orcIdx), batch.cols[orcIdx], null, rowsToRead);
-		}
-		return rowsToRead;
-	}
-
-	/**
-	 * Reads a vector of data into an array of objects.
-	 *
-	 * @param vals The array that needs to be filled.
-	 * @param fieldIdx If the vals array is an array of Row, the index of the field that needs to be filled.
-	 *                 Otherwise a -1 must be passed and the data is directly filled into the array.
-	 * @param schema The schema of the vector to read.
-	 * @param vector The vector to read.
-	 * @param lengthVector If the vector is of type List or Map, the number of sub-elements to read for each field. Otherwise, it must be null.
-	 * @param childCount The number of vector entries to read.
-	 */
-	private static void readField(Object[] vals, int fieldIdx, TypeDescription schema, ColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check the type of the vector to decide how to read it.
-		switch (schema.getCategory()) {
-			case BOOLEAN:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readBoolean, OrcUtils::boolArray);
-				}
-				break;
-			case BYTE:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readByte, OrcUtils::byteArray);
-				}
-				break;
-			case SHORT:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readShort, OrcUtils::shortArray);
-				}
-				break;
-			case INT:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readInt, OrcUtils::intArray);
-				}
-				break;
-			case LONG:
-				if (vector.noNulls) {
-					readNonNullLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray);
-				} else {
-					readLongColumn(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount, OrcUtils::readLong, OrcUtils::longArray);
-				}
-				break;
-			case FLOAT:
-				if (vector.noNulls) {
-					readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray);
-				} else {
-					readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readFloat, OrcUtils::floatArray);
-				}
-				break;
-			case DOUBLE:
-				if (vector.noNulls) {
-					readNonNullDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray);
-				} else {
-					readDoubleColumn(vals, fieldIdx, (DoubleColumnVector) vector, lengthVector, childCount, OrcUtils::readDouble, OrcUtils::doubleArray);
-				}
-				break;
-			case CHAR:
-			case VARCHAR:
-			case STRING:
-				if (vector.noNulls) {
-					readNonNullBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
-				} else {
-					readBytesColumnAsString(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case DATE:
-				if (vector.noNulls) {
-					readNonNullLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
-				} else {
-					readLongColumnAsDate(vals, fieldIdx, (LongColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case TIMESTAMP:
-				if (vector.noNulls) {
-					readNonNullTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
-				} else {
-					readTimestampColumn(vals, fieldIdx, (TimestampColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case BINARY:
-				if (vector.noNulls) {
-					readNonNullBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
-				} else {
-					readBytesColumnAsBinary(vals, fieldIdx, (BytesColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case DECIMAL:
-				if (vector.noNulls) {
-					readNonNullDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
-				} else {
-					readDecimalColumn(vals, fieldIdx, (DecimalColumnVector) vector, lengthVector, childCount);
-				}
-				break;
-			case STRUCT:
-				if (vector.noNulls) {
-					readNonNullStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
-				} else {
-					readStructColumn(vals, fieldIdx, (StructColumnVector) vector, schema, lengthVector, childCount);
-				}
-				break;
-			case LIST:
-				if (vector.noNulls) {
-					readNonNullListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
-				} else {
-					readListColumn(vals, fieldIdx, (ListColumnVector) vector, schema, lengthVector, childCount);
-				}
-				break;
-			case MAP:
-				if (vector.noNulls) {
-					readNonNullMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
-				} else {
-					readMapColumn(vals, fieldIdx, (MapColumnVector) vector, schema, lengthVector, childCount);
-				}
-				break;
-			case UNION:
-				throw new UnsupportedOperationException("UNION type not supported yet");
-			default:
-				throw new IllegalArgumentException("Unknown type " + schema);
-		}
-	}
-
-	private static <T> void readNonNullLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount,
-													LongFunction<T> reader, IntFunction<T[]> array) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				T repeatingValue = reader.apply(vector.vector[0]);
-				fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = reader.apply(vector.vector[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
-					}
-				}
-			}
-		} else { // in a list
-			T[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				T repeatingValue = reader.apply(vector.vector[0]);
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					Arrays.fill(temp, repeatingValue);
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = reader.apply(vector.vector[offset++]);
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static <T> void readNonNullDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount,
-													DoubleFunction<T> reader, IntFunction<T[]> array) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				T repeatingValue = reader.apply(vector.vector[0]);
-				fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = reader.apply(vector.vector[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
-					}
-				}
-			}
-		} else { // in a list
-			T[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				T repeatingValue = reader.apply(vector.vector[0]);
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					Arrays.fill(temp, repeatingValue);
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = reader.apply(vector.vector[offset++]);
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (bytes.isRepeating) { // fill complete column with first value
-				String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0]);
-				fillColumnWithRepeatingValue(vals, fieldIdx, repeatingValue, childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i], StandardCharsets.UTF_8));
-					}
-				}
-			}
-		} else {
-			String[] temp;
-			int offset = 0;
-			if (bytes.isRepeating) { // fill complete list with first value
-				String repeatingValue = new String(bytes.vector[0], bytes.start[0], bytes.length[0], StandardCharsets.UTF_8);
-				for (int i = 0; offset < childCount; i++) {
-					temp = new String[(int) lengthVector[i]];
-					Arrays.fill(temp, repeatingValue);
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new String[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset], StandardCharsets.UTF_8);
-						offset++;
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (bytes.isRepeating) { // fill complete column with first value
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						// don't reuse repeating val to avoid object mutation
-						vals[i] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						// don't reuse repeating val to avoid object mutation
-						rows[i].setField(fieldIdx, readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]));
-					}
-				}
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
-					}
-				}
-			}
-		} else {
-			byte[][] temp;
-			int offset = 0;
-			if (bytes.isRepeating) { // fill complete list with first value
-				for (int i = 0; offset < childCount; i++) {
-					temp = new byte[(int) lengthVector[i]][];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readBinary(bytes.vector[0], bytes.start[0], bytes.length[0]);
-					}
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new byte[(int) lengthVector[i]][];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-						offset++;
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						// do not reuse repeated value due to mutability of Date
-						vals[i] = readDate(vector.vector[0]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						// do not reuse repeated value due to mutability of Date
-						rows[i].setField(fieldIdx, readDate(vector.vector[0]));
-					}
-				}
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = readDate(vector.vector[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, readDate(vector.vector[i]));
-					}
-				}
-			}
-		} else { // in a list
-			Date[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Date[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readDate(vector.vector[0]);
-					}
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Date[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readDate(vector.vector[offset++]);
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the timestamps need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						// do not reuse value to prevent object mutation
-						vals[i] = readTimestamp(vector.time[0], vector.nanos[0]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						// do not reuse value to prevent object mutation
-						rows[i].setField(fieldIdx, readTimestamp(vector.time[0], vector.nanos[0]));
-					}
-				}
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = readTimestamp(vector.time[i], vector.nanos[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, readTimestamp(vector.time[i], vector.nanos[i]));
-					}
-				}
-			}
-		} else {
-			Timestamp[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Timestamp[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						// do not reuse value to prevent object mutation
-						temp[j] = readTimestamp(vector.time[0], vector.nanos[0]);
-					}
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Timestamp[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readTimestamp(vector.time[offset], vector.nanos[offset]);
-						offset++;
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readNonNullDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the decimals need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				fillColumnWithRepeatingValue(vals, fieldIdx, readBigDecimal(vector.vector[0]), childCount);
-			} else {
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						vals[i] = readBigDecimal(vector.vector[i]);
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
-					}
-				}
-			}
-		} else {
-			BigDecimal[] temp;
-			int offset = 0;
-			if (vector.isRepeating) { // fill complete list with first value
-				BigDecimal repeatingValue = readBigDecimal(vector.vector[0]);
-				for (int i = 0; offset < childCount; i++) {
-					temp = new BigDecimal[(int) lengthVector[i]];
-					Arrays.fill(temp, repeatingValue);
-					offset += temp.length;
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			} else {
-				for (int i = 0; offset < childCount; i++) {
-					temp = new BigDecimal[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						temp[j] = readBigDecimal(vector.vector[offset++]);
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-
-	}
-
-	private static void readNonNullStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> childrenTypes = schema.getChildren();
-
-		int numFields = childrenTypes.size();
-		// create a batch of Rows to read the structs
-		Row[] structs = new Row[childCount];
-		// TODO: possible improvement: reuse existing Row objects
-		for (int i = 0; i < childCount; i++) {
-			structs[i] = new Row(numFields);
-		}
-
-		// read struct fields
-		for (int i = 0; i < numFields; i++) {
-			readField(structs, i, childrenTypes.get(i), structVector.fields[i], null, childCount);
-		}
-
-		// check if the structs need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (fieldIdx == -1) { // set struct as an object
-				System.arraycopy(structs, 0, vals, 0, childCount);
-			} else { // set struct as a field of Row
-				Row[] rows = (Row[]) vals;
-				for (int i = 0; i < childCount; i++) {
-					rows[i].setField(fieldIdx, structs[i]);
-				}
-			}
-		} else { // struct in a list
-			int offset = 0;
-			Row[] temp;
-			for (int i = 0; offset < childCount; i++) {
-				temp = new Row[(int) lengthVector[i]];
-				System.arraycopy(structs, offset, temp, 0, temp.length);
-				offset = offset + temp.length;
-				if (fieldIdx == -1) {
-					vals[i] = temp;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void readNonNullListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		TypeDescription fieldType = schema.getChildren().get(0);
-		// check if the list need to be read into lists or as single values
-		if (lengthVector == null) {
-			long[] lengthVectorNested = list.lengths;
-			readField(vals, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount);
-		} else { // list in a list
-			Object[] nestedLists = new Object[childCount];
-			// length vector for nested list
-			long[] lengthVectorNested = list.lengths;
-			// read nested list
-			readField(nestedLists, -1, fieldType, list.child, lengthVectorNested, list.childCount);
-			// get type of nestedList
-			Class<?> classType = nestedLists[0].getClass();
-
-			// fill outer list with nested list
-			int offset = 0;
-			int length;
-			for (int i = 0; offset < childCount; i++) {
-				length = (int) lengthVector[i];
-				Object[] temp = (Object[]) Array.newInstance(classType, length);
-				System.arraycopy(nestedLists, offset, temp, 0, length);
-				offset = offset + length;
-				if (fieldIdx == -1) {
-					vals[i] = temp;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void readNonNullMapColumn(Object[] vals, int fieldIdx, MapColumnVector mapsVector, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> fieldType = schema.getChildren();
-		TypeDescription keyType = fieldType.get(0);
-		TypeDescription valueType = fieldType.get(1);
-
-		ColumnVector keys = mapsVector.keys;
-		ColumnVector values = mapsVector.values;
-		Object[] keyRows = new Object[mapsVector.childCount];
-		Object[] valueRows = new Object[mapsVector.childCount];
-
-		// read map keys and values
-		readField(keyRows, -1, keyType, keys, null, keyRows.length);
-		readField(valueRows, -1, valueType, values, null, valueRows.length);
-
-		// check if the maps need to be read into lists or as single values
-		if (lengthVector == null) {
-			long[] lengthVectorMap = mapsVector.lengths;
-			int offset = 0;
-
-			for (int i = 0; i < childCount; i++) {
-				long numMapEntries = lengthVectorMap[i];
-				HashMap map = readHashMap(keyRows, valueRows, offset, numMapEntries);
-				offset += numMapEntries;
-
-				if (fieldIdx == -1) {
-					vals[i] = map;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, map);
-				}
-			}
-		} else { // list of map
-
-			long[] lengthVectorMap = mapsVector.lengths;
-			int mapOffset = 0; // offset of map element
-			int offset = 0; // offset of map
-			HashMap[] temp;
-
-			for (int i = 0; offset < childCount; i++) {
-				temp = new HashMap[(int) lengthVector[i]];
-				for (int j = 0; j < temp.length; j++) {
-					long numMapEntries = lengthVectorMap[offset];
-					temp[j] = readHashMap(keyRows, valueRows, mapOffset, numMapEntries);
-					mapOffset += numMapEntries;
-					offset++;
-				}
-				if (fieldIdx == 1) {
-					vals[i] = temp;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static <T> void readLongColumn(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount,
-											LongFunction<T> reader, IntFunction<T[]> array) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = reader.apply(vector.vector[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
-						}
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array);
-			} else {
-				// column contain null values
-				int offset = 0;
-				T[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = reader.apply(vector.vector[offset++]);
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static <T> void readDoubleColumn(Object[] vals, int fieldIdx, DoubleColumnVector vector, long[] lengthVector, int childCount,
-												DoubleFunction<T> reader, IntFunction<T[]> array) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = reader.apply(vector.vector[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, reader.apply(vector.vector[i]));
-						}
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, array);
-			} else {
-				// column contain null values
-				int offset = 0;
-				T[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = array.apply((int) lengthVector[i]);
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = reader.apply(vector.vector[offset++]);
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readBytesColumnAsString(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (bytes.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = bytes.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = new String(bytes.vector[i], bytes.start[i], bytes.length[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, new String(bytes.vector[i], bytes.start[i], bytes.length[i]));
-						}
-					}
-				}
-			}
-		} else { // in a list
-			if (bytes.isRepeating) { // fill list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::stringArray);
-			} else {
-				int offset = 0;
-				String[] temp;
-				boolean[] isNullVector = bytes.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new String[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = new String(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-							offset++;
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readBytesColumnAsBinary(Object[] vals, int fieldIdx, BytesColumnVector bytes, long[] lengthVector, int childCount) {
-
-		// check if the binary need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (bytes.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = bytes.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, readBinary(bytes.vector[i], bytes.start[i], bytes.length[i]));
-						}
-					}
-				}
-			}
-		} else {
-			if (bytes.isRepeating) { // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::binaryArray);
-			} else {
-				int offset = 0;
-				byte[][] temp;
-				boolean[] isNullVector = bytes.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new byte[(int) lengthVector[i]][];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = readBinary(bytes.vector[offset], bytes.start[offset], bytes.length[offset]);
-							offset++;
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readLongColumnAsDate(Object[] vals, int fieldIdx, LongColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the values need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = readDate(vector.vector[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, readDate(vector.vector[i]));
-						}
-					}
-				}
-			}
-		} else { // in a list
-			if (vector.isRepeating) { // // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::dateArray);
-			} else {
-				// column contain null values
-				int offset = 0;
-				Date[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Date[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = readDate(vector.vector[offset++]);
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readTimestampColumn(Object[] vals, int fieldIdx, TimestampColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the timestamps need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
-							vals[i] = ts;
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							Timestamp ts = readTimestamp(vector.time[i], vector.nanos[i]);
-							rows[i].setField(fieldIdx, ts);
-						}
-					}
-				}
-			}
-		} else {
-			if (vector.isRepeating) { // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::timestampArray);
-			} else {
-				int offset = 0;
-				Timestamp[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new Timestamp[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = readTimestamp(vector.time[offset], vector.nanos[offset]);
-							offset++;
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readDecimalColumn(Object[] vals, int fieldIdx, DecimalColumnVector vector, long[] lengthVector, int childCount) {
-
-		// check if the decimals need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (vector.isRepeating) { // fill complete column with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillColumnWithRepeatingValue(vals, fieldIdx, null, childCount);
-			} else {
-				boolean[] isNullVector = vector.isNull;
-				if (fieldIdx == -1) { // set as an object
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							vals[i] = null;
-						} else {
-							vals[i] = readBigDecimal(vector.vector[i]);
-						}
-					}
-				} else { // set as a field of Row
-					Row[] rows = (Row[]) vals;
-					for (int i = 0; i < childCount; i++) {
-						if (isNullVector[i]) {
-							rows[i].setField(fieldIdx, null);
-						} else {
-							rows[i].setField(fieldIdx, readBigDecimal(vector.vector[i]));
-						}
-					}
-				}
-			}
-		} else {
-			if (vector.isRepeating) { // fill complete list with first value
-				// since the column contains null values and has just one distinct value, the repeated value is null
-				fillListWithRepeatingNull(vals, fieldIdx, lengthVector, childCount, OrcUtils::decimalArray);
-			} else {
-				int offset = 0;
-				BigDecimal[] temp;
-				boolean[] isNullVector = vector.isNull;
-				for (int i = 0; offset < childCount; i++) {
-					temp = new BigDecimal[(int) lengthVector[i]];
-					for (int j = 0; j < temp.length; j++) {
-						if (isNullVector[offset]) {
-							offset++;
-						} else {
-							temp[j] = readBigDecimal(vector.vector[offset++]);
-						}
-					}
-					if (fieldIdx == -1) {
-						vals[i] = temp;
-					} else {
-						((Row) vals[i]).setField(fieldIdx, temp);
-					}
-				}
-			}
-		}
-	}
-
-	private static void readStructColumn(Object[] vals, int fieldIdx, StructColumnVector structVector, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> childrenTypes = schema.getChildren();
-
-		int numFields = childrenTypes.size();
-		// create a batch of Rows to read the structs
-		Row[] structs = new Row[childCount];
-		// TODO: possible improvement: reuse existing Row objects
-		for (int i = 0; i < childCount; i++) {
-			structs[i] = new Row(numFields);
-		}
-
-		// read struct fields
-		for (int i = 0; i < numFields; i++) {
-			readField(structs, i, childrenTypes.get(i), structVector.fields[i], null, childCount);
-		}
-
-		boolean[] isNullVector = structVector.isNull;
-
-		// check if the structs need to be read into lists or as single values
-		if (lengthVector == null) {
-			if (fieldIdx == -1) { // set struct as an object
-				for (int i = 0; i < childCount; i++) {
-					if (isNullVector[i]) {
-						vals[i] = null;
-					} else {
-						vals[i] = structs[i];
-					}
-				}
-			} else { // set struct as a field of Row
-				Row[] rows = (Row[]) vals;
-				for (int i = 0; i < childCount; i++) {
-					if (isNullVector[i]) {
-						rows[i].setField(fieldIdx, null);
-					} else {
-						rows[i].setField(fieldIdx, structs[i]);
-					}
-				}
-			}
-		} else { // struct in a list
-			int offset = 0;
-			Row[] temp;
-			for (int i = 0; offset < childCount; i++) {
-				temp = new Row[(int) lengthVector[i]];
-				for (int j = 0; j < temp.length; j++) {
-					if (isNullVector[offset]) {
-						temp[j] = null;
-					} else {
-						temp[j] = structs[offset++];
-					}
-				}
-				if (fieldIdx == -1) { // set list of structs as an object
-					vals[i] = temp;
-				} else { // set list of structs as field of row
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void readListColumn(Object[] vals, int fieldIdx, ListColumnVector list, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		TypeDescription fieldType = schema.getChildren().get(0);
-		// check if the lists need to be read into lists or as single values
-		if (lengthVector == null) {
-			long[] lengthVectorNested = list.lengths;
-			readField(vals, fieldIdx, fieldType, list.child, lengthVectorNested, list.childCount);
-		} else { // list in a list
-			Object[] nestedList = new Object[childCount];
-			// length vector for nested list
-			long[] lengthVectorNested = list.lengths;
-			// read nested list
-			readField(nestedList, -1, fieldType, list.child, lengthVectorNested, list.childCount);
-
-			// fill outer list with nested list
-			int offset = 0;
-			int length;
-			// get type of nestedList
-			Class<?> classType = nestedList[0].getClass();
-			for (int i = 0; offset < childCount; i++) {
-				length = (int) lengthVector[i];
-				Object[] temp = (Object[]) Array.newInstance(classType, length);
-				System.arraycopy(nestedList, offset, temp, 0, length);
-				offset = offset + length;
-				if (fieldIdx == -1) { // set list of list as an object
-					vals[i] = temp;
-				} else { // set list of list as field of row
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	private static void readMapColumn(Object[] vals, int fieldIdx, MapColumnVector map, TypeDescription schema, long[] lengthVector, int childCount) {
-
-		List<TypeDescription> fieldType = schema.getChildren();
-		TypeDescription keyType = fieldType.get(0);
-		TypeDescription valueType = fieldType.get(1);
-
-		ColumnVector keys = map.keys;
-		ColumnVector values = map.values;
-		Object[] keyRows = new Object[map.childCount];
-		Object[] valueRows = new Object[map.childCount];
-
-		// read map kes and values
-		readField(keyRows, -1, keyType, keys, null, keyRows.length);
-		readField(valueRows, -1, valueType, values, null, valueRows.length);
-
-		boolean[] isNullVector = map.isNull;
-
-		// check if the maps need to be read into lists or as single values
-		if (lengthVector == null) {
-			long[] lengthVectorMap = map.lengths;
-			int offset = 0;
-			if (fieldIdx == -1) { // set map as an object
-				for (int i = 0; i < childCount; i++) {
-					if (isNullVector[i]) {
-						vals[i] = null;
-					} else {
-						vals[i] = readHashMap(keyRows, valueRows, offset, lengthVectorMap[i]);
-						offset += lengthVectorMap[i];
-					}
-				}
-			} else { // set map as a field of Row
-				Row[] rows = (Row[]) vals;
-				for (int i = 0; i < childCount; i++) {
-					if (isNullVector[i]) {
-						rows[i].setField(fieldIdx, null);
-					} else {
-						rows[i].setField(fieldIdx, readHashMap(keyRows, valueRows, offset, lengthVectorMap[i]));
-						offset += lengthVectorMap[i];
-					}
-				}
-			}
-		} else { // list of map
-			long[] lengthVectorMap = map.lengths;
-			int mapOffset = 0; // offset of map element
-			int offset = 0; // offset of map
-			HashMap[] temp;
-
-			for (int i = 0; offset < childCount; i++) {
-				temp = new HashMap[(int) lengthVector[i]];
-				for (int j = 0; j < temp.length; j++) {
-					if (isNullVector[offset]) {
-						temp[j] = null;
-					} else {
-						temp[j] = readHashMap(keyRows, valueRows, mapOffset, lengthVectorMap[offset]);
-						mapOffset += lengthVectorMap[offset];
-						offset++;
-					}
-				}
-				if (fieldIdx == -1) {
-					vals[i] = temp;
-				} else {
-					((Row) vals[i]).setField(fieldIdx, temp);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Sets a repeating value to all objects or row fields of the passed vals array.
-	 *
-	 * @param vals The array of objects or Rows.
-	 * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled.
-	 *                 Otherwise a -1 must be passed and the data is directly filled into the array.
-	 * @param repeatingValue The value that is set.
-	 * @param childCount The number of times the value is set.
-	 */
-	private static void fillColumnWithRepeatingValue(Object[] vals, int fieldIdx, Object repeatingValue, int childCount) {
-
-		if (fieldIdx == -1) {
-			// set value as an object
-			Arrays.fill(vals, 0, childCount, repeatingValue);
-		} else {
-			// set value as a field of Row
-			Row[] rows = (Row[]) vals;
-			for (int i = 0; i < childCount; i++) {
-				rows[i].setField(fieldIdx, repeatingValue);
-			}
-		}
-	}
-
-	/**
-	 * Sets arrays containing only null values to all objects or row fields of the passed vals array.
-	 *
-	 * @param vals The array of objects or Rows to which the empty arrays are set.
-	 * @param fieldIdx If the objs array is an array of Row, the index of the field that needs to be filled.
-	 *                 Otherwise a -1 must be passed and the data is directly filled into the array.
-	 * @param lengthVector The vector containing the lengths of the individual empty arrays.
-	 * @param childCount The number of objects or Rows to fill.
-	 * @param array A method to create arrays of the appropriate type.
-	 * @param <T> The type of the arrays to create.
-	 */
-	private static <T> void fillListWithRepeatingNull(Object[] vals, int fieldIdx, long[] lengthVector, int childCount, IntFunction<T[]> array) {
-
-		if (fieldIdx == -1) {
-			// set empty array as object
-			for (int i = 0; i < childCount; i++) {
-				vals[i] = array.apply((int) lengthVector[i]);
-			}
-		} else {
-			// set empty array as field in Row
-			Row[] rows = (Row[]) vals;
-			for (int i = 0; i < childCount; i++) {
-				rows[i].setField(fieldIdx, array.apply((int) lengthVector[i]));
-			}
-		}
-	}
-
-	private static Boolean readBoolean(long l) {
-		return l != 0;
-	}
-
-	private static Byte readByte(long l) {
-		return (byte) l;
-	}
-
-	private static Short readShort(long l) {
-		return (short) l;
-	}
-
-	private static Integer readInt(long l) {
-		return (int) l;
-	}
-
-	private static Long readLong(long l) {
-		return l;
-	}
-
-	private static Float readFloat(double d) {
-		return (float) d;
-	}
-
-	private static Double readDouble(double d) {
-		return d;
-	}
-
-	private static Date readDate(long l) {
-		// day to milliseconds
-		final long t = l * MILLIS_PER_DAY;
-		// adjust by local timezone
-		return new java.sql.Date(t - LOCAL_TZ.getOffset(t));
-	}
-
-	private static byte[] readBinary(byte[] src, int srcPos, int length) {
-		byte[] result = new byte[length];
-		System.arraycopy(src, srcPos, result, 0, length);
-		return result;
-	}
-
-	private static BigDecimal readBigDecimal(HiveDecimalWritable hiveDecimalWritable) {
-		HiveDecimal hiveDecimal = hiveDecimalWritable.getHiveDecimal();
-		return hiveDecimal.bigDecimalValue();
-	}
-
-	private static Timestamp readTimestamp(long time, int nanos) {
-		Timestamp ts = new Timestamp(time);
-		ts.setNanos(nanos);
-		return ts;
-	}
-
-	private static HashMap readHashMap(Object[] keyRows, Object[] valueRows, int offset, long length) {
-		HashMap<Object, Object> resultMap = new HashMap<>();
-		for (int j = 0; j < length; j++) {
-			resultMap.put(keyRows[offset], valueRows[offset]);
-			offset++;
-		}
-		return resultMap;
-	}
-
-	private static Boolean[] boolArray(int len) {
-		return new Boolean[len];
-	}
-
-	private static Byte[] byteArray(int len) {
-		return new Byte[len];
-	}
-
-	private static Short[] shortArray(int len) {
-		return new Short[len];
-	}
-
-	private static Integer[] intArray(int len) {
-		return new Integer[len];
-	}
-
-	private static Long[] longArray(int len) {
-		return new Long[len];
-	}
-
-	private static Float[] floatArray(int len) {
-		return new Float[len];
-	}
-
-	private static Double[] doubleArray(int len) {
-		return new Double[len];
-	}
-
-	private static Date[] dateArray(int len) {
-		return new Date[len];
-	}
-
-	private static byte[][] binaryArray(int len) {
-		return new byte[len][];
-	}
-
-	private static String[] stringArray(int len) {
-		return new String[len];
-	}
-
-	private static BigDecimal[] decimalArray(int len) {
-		return new BigDecimal[len];
-	}
-
-	private static Timestamp[] timestampArray(int len) {
-		return new Timestamp[len];
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java
new file mode 100644
index 0000000..b90313e
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcBatchReaderTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+
+import org.apache.orc.TypeDescription;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link OrcBatchReader}.
+ *
+ */
+public class OrcBatchReaderTest {
+
+	@Test
+	public void testFlatSchemaToTypeInfo1() {
+
+		String schema =
+			"struct<" +
+				"boolean1:boolean," +
+				"byte1:tinyint," +
+				"short1:smallint," +
+				"int1:int," +
+				"long1:bigint," +
+				"float1:float," +
+				"double1:double," +
+				"bytes1:binary," +
+				"string1:string," +
+				"date1:date," +
+				"timestamp1:timestamp," +
+				"decimal1:decimal(5,2)" +
+			">";
+		TypeInformation typeInfo = OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+		Assert.assertNotNull(typeInfo);
+		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+		// validate field types
+		Assert.assertArrayEquals(
+			new TypeInformation[]{
+				Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
+				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
+				Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO
+			},
+			rowTypeInfo.getFieldTypes());
+
+		// validate field names
+		Assert.assertArrayEquals(
+			new String[] {
+				"boolean1", "byte1", "short1", "int1", "long1", "float1", "double1",
+				"bytes1", "string1", "date1", "timestamp1", "decimal1"
+			},
+			rowTypeInfo.getFieldNames());
+
+	}
+
+	@Test
+	public void testNestedSchemaToTypeInfo1() {
+
+		String schema =
+			"struct<" +
+				"middle:struct<" +
+					"list:array<" +
+						"struct<" +
+							"int1:int," +
+							"string1:string" +
+						">" +
+					">" +
+				">," +
+				"list:array<" +
+					"struct<" +
+						"int1:int," +
+						"string1:string" +
+					">" +
+				">," +
+				"map:map<" +
+					"string," +
+					"struct<" +
+						"int1:int," +
+						"string1:string" +
+					">" +
+				">" +
+			">";
+		TypeInformation typeInfo = OrcBatchReader.schemaToTypeInfo(TypeDescription.fromString(schema));
+
+		Assert.assertNotNull(typeInfo);
+		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
+		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
+
+		// validate field types
+		Assert.assertArrayEquals(
+			new TypeInformation[]{
+				Types.ROW_NAMED(
+					new String[]{"list"},
+					ObjectArrayTypeInfo.getInfoFor(
+						Types.ROW_NAMED(
+							new String[]{"int1", "string1"},
+							Types.INT, Types.STRING
+						)
+					)
+				),
+				ObjectArrayTypeInfo.getInfoFor(
+					Types.ROW_NAMED(
+						new String[]{"int1", "string1"},
+						Types.INT, Types.STRING
+					)
+				),
+				new MapTypeInfo<>(
+					Types.STRING,
+					Types.ROW_NAMED(
+						new String[]{"int1", "string1"},
+						Types.INT, Types.STRING
+					)
+				)
+			},
+			rowTypeInfo.getFieldTypes());
+
+		// validate field names
+		Assert.assertArrayEquals(
+			new String[] {"middle", "list", "map"},
+			rowTypeInfo.getFieldNames());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
index 0efe41f..2eb3231 100644
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcRowInputFormatTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.orc.util.OrcTestFileGenerator;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.any;
@@ -124,6 +126,32 @@ public class OrcRowInputFormatTest {
 	private static final String TEST_FILE_NESTEDLIST = "test-data-nestedlist.orc";
 	private static final String TEST_SCHEMA_NESTEDLIST = "struct<mylist1:array<array<struct<mylong1:bigint>>>>";
 
+	/** Generated by {@link OrcTestFileGenerator#writeCompositeTypesWithNullsFile(String)}. */
+	private static final String TEST_FILE_COMPOSITES_NULLS = "test-data-composites-with-nulls.orc";
+	private static final String TEST_SCHEMA_COMPOSITES_NULLS =
+		"struct<" +
+			"int1:int," +
+			"record1:struct<f1:int,f2:string>," +
+			"list1:array<array<array<struct<f1:string,f2:string>>>>," +
+			"list2:array<map<string,int>>" +
+		">";
+
+	/** Generated by {@link OrcTestFileGenerator#writeCompositeTypesWithRepeatingFile(String)}. */
+	private static final String TEST_FILE_REPEATING = "test-data-repeating.orc";
+	private static final String TEST_SCHEMA_REPEATING =
+		"struct<" +
+			"int1:int," +
+			"int2:int," +
+			"int3:int," +
+			"record1:struct<f1:int,f2:string>," +
+			"record2:struct<f1:int,f2:string>," +
+			"list1:array<int>," +
+			"list2:array<int>," +
+			"list3:array<int>," +
+			"map1:map<int,string>," +
+			"map2:map<int,string>" +
+		">";
+
 	@Test(expected = FileNotFoundException.class)
 	public void testInvalidPath() throws IOException{
 		rowOrcInputFormat =
@@ -477,7 +505,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadNestedFile() throws IOException{
+	public void testReadNestedFile() throws IOException {
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
 
 		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
@@ -563,7 +591,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadTimeTypeFile() throws IOException{
+	public void testReadTimeTypeFile() throws IOException {
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_TIMETYPES), TEST_SCHEMA_TIMETYPES, new Configuration());
 
 		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
@@ -590,7 +618,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadDecimalTypeFile() throws IOException{
+	public void testReadDecimalTypeFile() throws IOException {
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_DECIMAL), TEST_SCHEMA_DECIMAL, new Configuration());
 
 		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
@@ -653,7 +681,183 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadWithProjection() throws IOException{
+	public void testReadCompositesNullsFile() throws Exception {
+		rowOrcInputFormat = new OrcRowInputFormat(
+			getPath(TEST_FILE_COMPOSITES_NULLS),
+			TEST_SCHEMA_COMPOSITES_NULLS,
+			new Configuration());
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+		rowOrcInputFormat.open(splits[0]);
+
+		assertFalse(rowOrcInputFormat.reachedEnd());
+
+		Row row = null;
+		long cnt = 0;
+
+		int structNullCnt = 0;
+		int nestedListNullCnt = 0;
+		int mapListNullCnt = 0;
+
+		// read all rows
+		while (!rowOrcInputFormat.reachedEnd()) {
+
+			row = rowOrcInputFormat.nextRecord(row);
+			assertEquals(4, row.getArity());
+
+			assertTrue(row.getField(0) instanceof Integer);
+
+			if (row.getField(1) == null) {
+				structNullCnt++;
+			} else {
+				Object f = row.getField(1);
+				assertTrue(f instanceof Row);
+				assertEquals(2, ((Row) f).getArity());
+			}
+
+			if (row.getField(2) == null) {
+				nestedListNullCnt++;
+			} else {
+				Object f = row.getField(2);
+				assertTrue(f instanceof Row[][][]);
+				assertEquals(4, ((Row[][][]) f).length);
+			}
+
+			if (row.getField(3) == null) {
+				mapListNullCnt++;
+			} else {
+				Object f = row.getField(3);
+				assertTrue(f instanceof HashMap[]);
+				assertEquals(3, ((HashMap[]) f).length);
+			}
+			cnt++;
+		}
+		// number of rows in file
+		assertEquals(2500, cnt);
+		// check number of null fields
+		assertEquals(1250, structNullCnt);
+		assertEquals(835, nestedListNullCnt);
+		assertEquals(835, mapListNullCnt);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void testReadRepeatingValuesFile() throws IOException {
+		rowOrcInputFormat = new OrcRowInputFormat(
+			getPath(TEST_FILE_REPEATING),
+			TEST_SCHEMA_REPEATING,
+			new Configuration());
+
+		FileInputSplit[] splits = rowOrcInputFormat.createInputSplits(1);
+		assertEquals(1, splits.length);
+		rowOrcInputFormat.openInputFormat();
+		rowOrcInputFormat.open(splits[0]);
+
+		assertFalse(rowOrcInputFormat.reachedEnd());
+
+		Row row = null;
+		long cnt = 0;
+
+		Row firstRow1 = null;
+		Integer[] firstList1 = null;
+		HashMap firstMap1 = null;
+
+		// read all rows
+		while (!rowOrcInputFormat.reachedEnd()) {
+
+			cnt++;
+			row = rowOrcInputFormat.nextRecord(row);
+			assertEquals(10, row.getArity());
+
+			// check first int field (always 42)
+			assertNotNull(row.getField(0));
+			assertTrue(row.getField(0) instanceof Integer);
+			assertEquals(42, ((Integer) row.getField(0)).intValue());
+
+			// check second int field (always null)
+			assertNull(row.getField(1));
+
+			// check first int field (always 99)
+			assertNotNull(row.getField(2));
+			assertTrue(row.getField(2) instanceof Integer);
+			assertEquals(99, ((Integer) row.getField(2)).intValue());
+
+			// check first row field (always (23, null))
+			assertNotNull(row.getField(3));
+			assertTrue(row.getField(3) instanceof Row);
+			Row nestedRow = (Row) row.getField(3);
+			// check first field of nested row
+			assertNotNull(nestedRow.getField(0));
+			assertTrue(nestedRow.getField(0) instanceof Integer);
+			assertEquals(23, ((Integer) nestedRow.getField(0)).intValue());
+			// check second field of nested row
+			assertNull(nestedRow.getField(1));
+			// validate reference
+			if (firstRow1 == null) {
+				firstRow1 = nestedRow;
+			} else {
+				// repeated rows must be different instances
+				assertTrue(firstRow1 != nestedRow);
+			}
+
+			// check second row field (always null)
+			assertNull(row.getField(4));
+
+			// check first list field (always [1, 2, 3])
+			assertNotNull(row.getField(5));
+			assertTrue(row.getField(5) instanceof Integer[]);
+			Integer[] list1 = ((Integer[]) row.getField(5));
+			assertEquals(1, list1[0].intValue());
+			assertEquals(2, list1[1].intValue());
+			assertEquals(3, list1[2].intValue());
+			// validate reference
+			if (firstList1 == null) {
+				firstList1 = list1;
+			} else {
+				// repeated list must be different instances
+				assertTrue(firstList1 != list1);
+			}
+
+			// check second list field (always [7, 7, 7])
+			assertNotNull(row.getField(6));
+			assertTrue(row.getField(6) instanceof Integer[]);
+			Integer[] list2 = ((Integer[]) row.getField(6));
+			assertEquals(7, list2[0].intValue());
+			assertEquals(7, list2[1].intValue());
+			assertEquals(7, list2[2].intValue());
+
+			// check third list field (always null)
+			assertNull(row.getField(7));
+
+			// check first map field (always {2->"Hello", 4->"Hello})
+			assertNotNull(row.getField(8));
+			assertTrue(row.getField(8) instanceof HashMap);
+			HashMap<Integer, String> map = (HashMap<Integer, String>) row.getField(8);
+			assertEquals(2, map.size());
+			assertEquals("Hello", map.get(2));
+			assertEquals("Hello", map.get(4));
+			// validate reference
+			if (firstMap1 == null) {
+				firstMap1 = map;
+			} else {
+				// repeated list must be different instances
+				assertTrue(firstMap1 != map);
+			}
+
+			// check second map field (always null)
+			assertNull(row.getField(9));
+		}
+
+		rowOrcInputFormat.close();
+		rowOrcInputFormat.closeInputFormat();
+
+		assertEquals(256, cnt);
+	}
+
+	@Test
+	public void testReadWithProjection() throws IOException {
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_NESTED), TEST_SCHEMA_NESTED, new Configuration());
 
 		rowOrcInputFormat.selectFields(7, 0, 10, 8);
@@ -691,7 +895,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadFileInSplits() throws IOException{
+	public void testReadFileInSplits() throws IOException {
 
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
 		rowOrcInputFormat.selectFields(0, 1);
@@ -717,7 +921,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadFileWithFilter() throws IOException{
+	public void testReadFileWithFilter() throws IOException {
 
 		rowOrcInputFormat = new OrcRowInputFormat(getPath(TEST_FILE_FLAT), TEST_SCHEMA_FLAT, new Configuration());
 		rowOrcInputFormat.selectFields(0, 1);
@@ -751,7 +955,7 @@ public class OrcRowInputFormatTest {
 	}
 
 	@Test
-	public void testReadFileWithEvolvedSchema() throws IOException{
+	public void testReadFileWithEvolvedSchema() throws IOException {
 
 		rowOrcInputFormat = new OrcRowInputFormat(
 			getPath(TEST_FILE_FLAT),

http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
deleted file mode 100644
index 2cb1715..0000000
--- a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/OrcUtilsTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.orc;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.api.java.typeutils.MapTypeInfo;
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
-import org.apache.flink.api.java.typeutils.RowTypeInfo;
-
-import org.apache.orc.TypeDescription;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Unit tests for {@link OrcUtils}.
- *
- */
-public class OrcUtilsTest {
-
-	@Test
-	public void testFlatSchemaToTypeInfo1() {
-
-		String schema =
-			"struct<" +
-				"boolean1:boolean," +
-				"byte1:tinyint," +
-				"short1:smallint," +
-				"int1:int," +
-				"long1:bigint," +
-				"float1:float," +
-				"double1:double," +
-				"bytes1:binary," +
-				"string1:string," +
-				"date1:date," +
-				"timestamp1:timestamp," +
-				"decimal1:decimal(5,2)" +
-			">";
-		TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
-
-		Assert.assertNotNull(typeInfo);
-		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
-		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
-
-		// validate field types
-		Assert.assertArrayEquals(
-			new TypeInformation[]{
-				Types.BOOLEAN, Types.BYTE, Types.SHORT, Types.INT, Types.LONG, Types.FLOAT, Types.DOUBLE,
-				PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, Types.STRING,
-				Types.SQL_DATE, Types.SQL_TIMESTAMP, BasicTypeInfo.BIG_DEC_TYPE_INFO
-			},
-			rowTypeInfo.getFieldTypes());
-
-		// validate field names
-		Assert.assertArrayEquals(
-			new String[] {
-				"boolean1", "byte1", "short1", "int1", "long1", "float1", "double1",
-				"bytes1", "string1", "date1", "timestamp1", "decimal1"
-			},
-			rowTypeInfo.getFieldNames());
-
-	}
-
-	@Test
-	public void testNestedSchemaToTypeInfo1() {
-
-		String schema =
-			"struct<" +
-				"middle:struct<" +
-					"list:array<" +
-						"struct<" +
-							"int1:int," +
-							"string1:string" +
-						">" +
-					">" +
-				">," +
-				"list:array<" +
-					"struct<" +
-						"int1:int," +
-						"string1:string" +
-					">" +
-				">," +
-				"map:map<" +
-					"string," +
-					"struct<" +
-						"int1:int," +
-						"string1:string" +
-					">" +
-				">" +
-			">";
-		TypeInformation typeInfo = OrcUtils.schemaToTypeInfo(TypeDescription.fromString(schema));
-
-		Assert.assertNotNull(typeInfo);
-		Assert.assertTrue(typeInfo instanceof RowTypeInfo);
-		RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
-
-		// validate field types
-		Assert.assertArrayEquals(
-			new TypeInformation[]{
-				Types.ROW_NAMED(
-					new String[]{"list"},
-					ObjectArrayTypeInfo.getInfoFor(
-						Types.ROW_NAMED(
-							new String[]{"int1", "string1"},
-							Types.INT, Types.STRING
-						)
-					)
-				),
-				ObjectArrayTypeInfo.getInfoFor(
-					Types.ROW_NAMED(
-						new String[]{"int1", "string1"},
-						Types.INT, Types.STRING
-					)
-				),
-				new MapTypeInfo<>(
-					Types.STRING,
-					Types.ROW_NAMED(
-						new String[]{"int1", "string1"},
-						Types.INT, Types.STRING
-					)
-				)
-			},
-			rowTypeInfo.getFieldTypes());
-
-		// validate field names
-		Assert.assertArrayEquals(
-			new String[] {"middle", "list", "map"},
-			rowTypeInfo.getFieldNames());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java
new file mode 100644
index 0000000..9d3be63
--- /dev/null
+++ b/flink-connectors/flink-orc/src/test/java/org/apache/flink/orc/util/OrcTestFileGenerator.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.orc.util;
+
+import org.apache.flink.orc.OrcRowInputFormatTest;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A generator for ORC test files.
+ */
+public class OrcTestFileGenerator {
+
+	public static void main(String[] args) throws IOException {
+		writeCompositeTypesWithNullsFile(args[0]);
+//		writeCompositeTypesWithRepeatingFile(args[0]);
+	}
+
+	/**
+	 * Writes an ORC file with nested composite types and null values on different levels.
+	 * Generates {@link OrcRowInputFormatTest#TEST_FILE_COMPOSITES_NULLS}.
+	 */
+	private static void writeCompositeTypesWithNullsFile(String path) throws IOException {
+
+		Path filePath = new Path(path);
+		Configuration conf = new Configuration();
+
+		TypeDescription schema =
+			TypeDescription.fromString(
+				"struct<" +
+					"int1:int," +
+					"record1:struct<f1:int,f2:string>," +
+					"list1:array<array<array<struct<f1:string,f2:string>>>>," +
+					"list2:array<map<string,int>>" +
+				">");
+
+		Writer writer =
+			OrcFile.createWriter(filePath,
+				OrcFile.writerOptions(conf).setSchema(schema));
+
+		VectorizedRowBatch batch = schema.createRowBatch();
+		LongColumnVector int1 = (LongColumnVector) batch.cols[0];
+
+		StructColumnVector record1 = (StructColumnVector) batch.cols[1];
+		LongColumnVector record1F1 = (LongColumnVector) record1.fields[0];
+		BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1];
+
+		ListColumnVector list1 = (ListColumnVector) batch.cols[2];
+		ListColumnVector nestedList = (ListColumnVector) list1.child;
+		ListColumnVector nestedList2 = (ListColumnVector) nestedList.child;
+		StructColumnVector listEntries = (StructColumnVector) nestedList2.child;
+		BytesColumnVector entryField1 = (BytesColumnVector) listEntries.fields[0];
+		BytesColumnVector entryField2 = (BytesColumnVector) listEntries.fields[1];
+
+		ListColumnVector list2 = (ListColumnVector) batch.cols[3];
+		MapColumnVector map1 = (MapColumnVector) list2.child;
+		BytesColumnVector keys = (BytesColumnVector) map1.keys;
+		LongColumnVector vals = (LongColumnVector) map1.values;
+
+		final int list1Size = 4;
+		final int nestedListSize = 3;
+		final int nestedList2Size = 2;
+		final int list2Size = 3;
+		final int mapSize = 3;
+
+		final int batchSize = batch.getMaxSize();
+
+		// Ensure the vectors have sufficient capacity
+		nestedList.ensureSize(batchSize * list1Size, false);
+		nestedList2.ensureSize(batchSize * list1Size * nestedListSize, false);
+		listEntries.ensureSize(batchSize * list1Size * nestedListSize * nestedList2Size, false);
+		map1.ensureSize(batchSize * list2Size, false);
+		keys.ensureSize(batchSize * list2Size * mapSize, false);
+		vals.ensureSize(batchSize * list2Size * mapSize, false);
+
+		// add 2500 rows to file
+		for (int r = 0; r < 2500; ++r) {
+			int row = batch.size++;
+
+			// mark nullable fields
+			list1.noNulls = false;
+			nestedList.noNulls = false;
+			listEntries.noNulls = false;
+			entryField1.noNulls = false;
+			record1.noNulls = false;
+			record1F2.noNulls = false;
+			list2.noNulls = false;
+			map1.noNulls = false;
+			keys.noNulls = false;
+			vals.noNulls = false;
+
+			// first field: int1
+			int1.vector[row] = r;
+
+			// second field: struct
+			if (row % 2 != 0) {
+				// in every second row, the struct is null
+				record1F1.vector[row] = row;
+				if (row % 5 != 0) {
+					// in every fifth row, the second field of the struct is null
+					record1F2.setVal(row, ("f2-" + row).getBytes(StandardCharsets.UTF_8));
+				} else {
+					record1F2.isNull[row] = true;
+				}
+			} else {
+				record1.isNull[row] = true;
+			}
+
+			// third field: deeply nested list
+			if (row % 3 != 0) {
+				// in every third row, the nested list is null
+				list1.offsets[row] = list1.childCount;
+				list1.lengths[row] = list1Size;
+				list1.childCount += list1Size;
+
+				for (int i = 0; i < list1Size; i++) {
+
+					int listOffset = (int) list1.offsets[row] + i;
+					if (i != 2) {
+						// second nested list is always null
+						nestedList.offsets[listOffset] = nestedList.childCount;
+						nestedList.lengths[listOffset] = nestedListSize;
+						nestedList.childCount += nestedListSize;
+
+						for (int j = 0; j < nestedListSize; j++) {
+							int nestedOffset = (int) nestedList.offsets[listOffset] + j;
+							nestedList2.offsets[nestedOffset] = nestedList2.childCount;
+							nestedList2.lengths[nestedOffset] = nestedList2Size;
+							nestedList2.childCount += nestedList2Size;
+
+							for (int k = 0; k < nestedList2Size; k++) {
+								int nestedOffset2 = (int) nestedList2.offsets[nestedOffset] + k;
+								// list entries
+								if (k != 1) {
+									// second struct is always null
+									if (k != 0) {
+										// first struct field in first struct is always null
+										entryField1.setVal(nestedOffset2, ("f1-" + k).getBytes(StandardCharsets.UTF_8));
+									} else {
+										entryField1.isNull[nestedOffset2] = true;
+									}
+									entryField2.setVal(nestedOffset2, ("f2-" + k).getBytes(StandardCharsets.UTF_8));
+								} else {
+									listEntries.isNull[nestedOffset2] = true;
+								}
+							}
+						}
+					} else {
+						nestedList.isNull[listOffset] = true;
+					}
+				}
+			} else {
+				list1.isNull[row] = true;
+			}
+
+			// forth field: map in list
+			if (row % 3 != 0) {
+				// in every third row, the map list is null
+				list2.offsets[row] = list2.childCount;
+				list2.lengths[row] = list2Size;
+				list2.childCount += list2Size;
+
+				for (int i = 0; i < list2Size; i++) {
+					int mapOffset = (int) list2.offsets[row] + i;
+
+					if (i != 2) {
+						// second map list entry is always null
+						map1.offsets[mapOffset] = map1.childCount;
+						map1.lengths[mapOffset] = mapSize;
+						map1.childCount += mapSize;
+
+						for (int j = 0; j < mapSize; j++) {
+							int mapEntryOffset = (int) map1.offsets[mapOffset] + j;
+
+							if (j != 1) {
+								// key in second map entry is always null
+								keys.setVal(mapEntryOffset, ("key-" + row + "-" + j).getBytes(StandardCharsets.UTF_8));
+							} else {
+								keys.isNull[mapEntryOffset] = true;
+							}
+							if (j != 2) {
+								// value in third map entry is always null
+								vals.vector[mapEntryOffset] = row + i + j;
+							} else {
+								vals.isNull[mapEntryOffset] = true;
+							}
+						}
+					} else {
+						map1.isNull[mapOffset] = true;
+					}
+				}
+			} else {
+				list2.isNull[row] = true;
+			}
+
+			if (row == batchSize - 1) {
+				writer.addRowBatch(batch);
+				batch.reset();
+			}
+		}
+		if (batch.size != 0) {
+			writer.addRowBatch(batch);
+			batch.reset();
+		}
+		writer.close();
+	}
+
+	/**
+	 * Writes an ORC file with nested composite types and repeated values.
+	 * Generates {@link OrcRowInputFormatTest#TEST_FILE_REPEATING}.
+	 */
+	private static void writeCompositeTypesWithRepeatingFile(String path) throws IOException {
+
+		Path filePath = new Path(path);
+		Configuration conf = new Configuration();
+
+		TypeDescription schema =
+			TypeDescription.fromString(
+				"struct<" +
+					"int1:int," +
+					"int2:int," +
+					"int3:int," +
+					"record1:struct<f1:int,f2:string>," +
+					"record2:struct<f1:int,f2:string>," +
+					"list1:array<int>," +
+					"list2:array<int>," +
+					"list3:array<int>," +
+					"map1:map<int,string>," +
+					"map2:map<int,string>" +
+				">");
+
+		Writer writer =
+			OrcFile.createWriter(filePath,
+				OrcFile.writerOptions(conf).setSchema(schema));
+
+		VectorizedRowBatch batch = schema.createRowBatch();
+
+		LongColumnVector int1 = (LongColumnVector) batch.cols[0];
+		LongColumnVector int2 = (LongColumnVector) batch.cols[1];
+		LongColumnVector int3 = (LongColumnVector) batch.cols[2];
+
+		StructColumnVector record1 = (StructColumnVector) batch.cols[3];
+		LongColumnVector record1F1 = (LongColumnVector) record1.fields[0];
+		BytesColumnVector record1F2 = (BytesColumnVector) record1.fields[1];
+		StructColumnVector record2 = (StructColumnVector) batch.cols[4];
+
+		ListColumnVector list1 = (ListColumnVector) batch.cols[5];
+		LongColumnVector list1int = (LongColumnVector) list1.child;
+		ListColumnVector list2 = (ListColumnVector) batch.cols[6];
+		LongColumnVector list2int = (LongColumnVector) list2.child;
+		ListColumnVector list3 = (ListColumnVector) batch.cols[7];
+
+		MapColumnVector map1 = (MapColumnVector) batch.cols[8];
+		LongColumnVector map1keys = (LongColumnVector) map1.keys;
+		BytesColumnVector map1vals = (BytesColumnVector) map1.values;
+		MapColumnVector map2 = (MapColumnVector) batch.cols[9];
+
+		final int listSize = 3;
+		final int mapSize = 2;
+
+		final int batchSize = batch.getMaxSize();
+
+		// Ensure the vectors have sufficient capacity
+		list1int.ensureSize(batchSize * listSize, false);
+		list2int.ensureSize(batchSize * listSize, false);
+		map1keys.ensureSize(batchSize * mapSize, false);
+		map1vals.ensureSize(batchSize * mapSize, false);
+
+		// int1: all values are 42
+		int1.noNulls = true;
+		int1.setRepeating(true);
+		int1.vector[0] = 42;
+
+		// int2: all values are null
+		int2.noNulls = false;
+		int2.setRepeating(true);
+		int2.isNull[0] = true;
+
+		// int3: all values are 99
+		int3.noNulls = false;
+		int3.setRepeating(true);
+		int3.isNull[0] = false;
+		int3.vector[0] = 99;
+
+		// record1: all records are [23, "Hello"]
+		record1.noNulls = true;
+		record1.setRepeating(true);
+		for (int i = 0; i < batchSize; i++) {
+			record1F1.vector[i] = i + 23;
+		}
+		record1F2.noNulls = false;
+		record1F2.isNull[0] = true;
+
+		// record2: all records are null
+		record2.noNulls = false;
+		record2.setRepeating(true);
+		record2.isNull[0] = true;
+
+		// list1: all lists are [1, 2, 3]
+		list1.noNulls = true;
+		list1.setRepeating(true);
+		list1.lengths[0] = listSize;
+		list1.offsets[0] = 1;
+		for (int i = 0; i < batchSize * listSize; i++) {
+			list1int.vector[i] = i;
+		}
+
+		// list2: all lists are [7, 7, 7]
+		list2.noNulls = true;
+		list2.setRepeating(true);
+		list2.lengths[0] = listSize;
+		list2.offsets[0] = 0;
+		list2int.setRepeating(true);
+		list2int.vector[0] = 7;
+
+		// list3: all lists are null
+		list3.noNulls = false;
+		list3.setRepeating(true);
+		list3.isNull[0] = true;
+
+		// map1: all maps are [2 -> "HELLO", 4 -> "HELLO"]
+		map1.noNulls = true;
+		map1.setRepeating(true);
+		map1.lengths[0] = mapSize;
+		map1.offsets[0] = 1;
+		for (int i = 0; i < batchSize * mapSize; i++) {
+			map1keys.vector[i] = i * 2;
+		}
+		map1vals.setRepeating(true);
+		map1vals.setVal(0, "Hello".getBytes(StandardCharsets.UTF_8));
+
+		// map2: all maps are null
+		map2.noNulls = false;
+		map2.setRepeating(true);
+		map2.isNull[0] = true;
+
+		batch.size = 256;
+
+		writer.addRowBatch(batch);
+		batch.reset();
+		writer.close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc b/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc
new file mode 100644
index 0000000..eed1c55
Binary files /dev/null and b/flink-connectors/flink-orc/src/test/resources/test-data-composites-with-nulls.orc differ

http://git-wip-us.apache.org/repos/asf/flink/blob/110b86dd/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc b/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc
new file mode 100644
index 0000000..ff2c917
Binary files /dev/null and b/flink-connectors/flink-orc/src/test/resources/test-data-repeating.orc differ


Mime
View raw message