Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4F766200BEA for ; Tue, 22 Nov 2016 03:46:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4E52B160B25; Tue, 22 Nov 2016 02:46:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CEA2F160B20 for ; Tue, 22 Nov 2016 03:46:13 +0100 (CET) Received: (qmail 39759 invoked by uid 500); 22 Nov 2016 02:46:11 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 38729 invoked by uid 99); 22 Nov 2016 02:46:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Nov 2016 02:46:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 833BBF17B5; Tue, 22 Nov 2016 02:46:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Tue, 22 Nov 2016 02:46:20 -0000 Message-Id: <0b2df134070d4cf6bf8c3f23e709e007@git.apache.org> In-Reply-To: <616d6f871d964d2ab5c523c73c94a926@git.apache.org> References: <616d6f871d964d2ab5c523c73c94a926@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [11/35] hive git commit: HIVE-14815: Implement Parquet vectorization reader for Primitive types(Ferdinand Xu, review by Chao Sun) This closes #104 archived-at: Tue, 22 Nov 2016 02:46:15 -0000 http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java new file mode 100644 index 0000000..f94c49a --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/vector/VectorizedParquetRecordReader.java @@ -0,0 +1,289 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.vector; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase; +import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetInputSplit; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.format.converter.ParquetMetadataConverter.range; +import static org.apache.parquet.hadoop.ParquetFileReader.readFooter; +import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; + +/** + * This reader is used to read a batch of record from inputsplit, part of the code is referred + * from Apache Spark and Apache Parquet. + */ +public class VectorizedParquetRecordReader extends ParquetRecordReaderBase + implements RecordReader { + public static final Logger LOG = LoggerFactory.getLogger(VectorizedParquetRecordReader.class); + + private List colsToInclude; + + protected MessageType fileSchema; + protected MessageType requestedSchema; + private List columnNamesList; + private List columnTypesList; + private VectorizedRowBatchCtx rbCtx; + + /** + * For each request column, the reader to read this column. This is NULL if this column + * is missing from the file, in which case we populate the attribute with NULL. + */ + private VectorizedColumnReader[] columnReaders; + + /** + * The number of rows that have been returned. + */ + private long rowsReturned; + + /** + * The number of rows that have been reading, including the current in flight row group. + */ + private long totalCountLoadedSoFar = 0; + + /** + * The total number of rows this RecordReader will eventually read. The sum of the + * rows of all the row groups. + */ + protected long totalRowCount; + + @VisibleForTesting + public VectorizedParquetRecordReader( + InputSplit inputSplit, + JobConf conf) { + try { + serDeStats = new SerDeStats(); + projectionPusher = new ProjectionPusher(); + initialize(inputSplit, conf); + colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + } catch (Throwable e) { + LOG.error("Failed to create the vectorized reader due to exception " + e); + throw new RuntimeException(e); + } + } + + public VectorizedParquetRecordReader( + org.apache.hadoop.mapred.InputSplit oldInputSplit, + JobConf conf) { + try { + serDeStats = new SerDeStats(); + projectionPusher = new ProjectionPusher(); + initialize(getSplit(oldInputSplit, conf), conf); + colsToInclude = ColumnProjectionUtils.getReadColumnIDs(conf); + rbCtx = Utilities.getVectorizedRowBatchCtx(conf); + } catch (Throwable e) { + LOG.error("Failed to create the vectorized reader due to exception " + e); + throw new RuntimeException(e); + } + } + + public void initialize( + InputSplit oldSplit, + JobConf configuration) throws IOException, InterruptedException { + jobConf = configuration; + ParquetMetadata footer; + List blocks; + ParquetInputSplit split = (ParquetInputSplit) oldSplit; + boolean indexAccess = + configuration.getBoolean(DataWritableReadSupport.PARQUET_COLUMN_INDEX_ACCESS, false); + this.file = split.getPath(); + long[] rowGroupOffsets = split.getRowGroupOffsets(); + + String columnNames = configuration.get(IOConstants.COLUMNS); + columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); + String columnTypes = configuration.get(IOConstants.COLUMNS_TYPES); + columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); + + // if task.side.metadata is set, rowGroupOffsets is null + if (rowGroupOffsets == null) { + //TODO check whether rowGroupOffSets can be null + // then we need to apply the predicate push down filter + footer = readFooter(configuration, file, range(split.getStart(), split.getEnd())); + MessageType fileSchema = footer.getFileMetaData().getSchema(); + FilterCompat.Filter filter = getFilter(configuration); + blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); + } else { + // otherwise we find the row groups that were selected on the client + footer = readFooter(configuration, file, NO_FILTER); + Set offsets = new HashSet<>(); + for (long offset : rowGroupOffsets) { + offsets.add(offset); + } + blocks = new ArrayList<>(); + for (BlockMetaData block : footer.getBlocks()) { + if (offsets.contains(block.getStartingPos())) { + blocks.add(block); + } + } + // verify we found them all + if (blocks.size() != rowGroupOffsets.length) { + long[] foundRowGroupOffsets = new long[footer.getBlocks().size()]; + for (int i = 0; i < foundRowGroupOffsets.length; i++) { + foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos(); + } + // this should never happen. + // provide a good error message in case there's a bug + throw new IllegalStateException( + "All the offsets listed in the split should be found in the file." + + " expected: " + Arrays.toString(rowGroupOffsets) + + " found: " + blocks + + " out of: " + Arrays.toString(foundRowGroupOffsets) + + " in range " + split.getStart() + ", " + split.getEnd()); + } + } + + for (BlockMetaData block : blocks) { + this.totalRowCount += block.getRowCount(); + } + this.fileSchema = footer.getFileMetaData().getSchema(); + + MessageType tableSchema; + if (indexAccess) { + List indexSequence = new ArrayList<>(); + + // Generates a sequence list of indexes + for(int i = 0; i < columnNamesList.size(); i++) { + indexSequence.add(i); + } + + tableSchema = DataWritableReadSupport.getSchemaByIndex(fileSchema, columnNamesList, + indexSequence); + } else { + tableSchema = DataWritableReadSupport.getSchemaByName(fileSchema, columnNamesList, + columnTypesList); + } + + List indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration); + if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) { + requestedSchema = + DataWritableReadSupport.getSchemaByIndex(tableSchema, columnNamesList, indexColumnsWanted); + } else { + requestedSchema = fileSchema; + } + + this.reader = new ParquetFileReader( + configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns()); + } + + @Override + public boolean next( + NullWritable nullWritable, + VectorizedRowBatch vectorizedRowBatch) throws IOException { + return nextBatch(vectorizedRowBatch); + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + return rbCtx.createVectorizedRowBatch(); + } + + @Override + public long getPos() throws IOException { + //TODO + return 0; + } + + @Override + public void close() throws IOException { + } + + @Override + public float getProgress() throws IOException { + //TODO + return 0; + } + + /** + * Advances to the next batch of rows. Returns false if there are no more. + */ + private boolean nextBatch(VectorizedRowBatch columnarBatch) throws IOException { + columnarBatch.reset(); + if (rowsReturned >= totalRowCount) { + return false; + } + checkEndOfRowGroup(); + + int num = (int) Math.min(VectorizedRowBatch.DEFAULT_SIZE, totalCountLoadedSoFar - rowsReturned); + for (int i = 0; i < columnReaders.length; ++i) { + if (columnReaders[i] == null) { + continue; + } + columnarBatch.cols[colsToInclude.get(i)].isRepeating = true; + columnReaders[i].readBatch(num, columnarBatch.cols[colsToInclude.get(i)], + columnTypesList.get(colsToInclude.get(i))); + } + rowsReturned += num; + columnarBatch.size = num; + return true; + } + + private void checkEndOfRowGroup() throws IOException { + if (rowsReturned != totalCountLoadedSoFar) { + return; + } + PageReadStore pages = reader.readNextRowGroup(); + if (pages == null) { + throw new IOException("expecting more rows but reached last block. Read " + + rowsReturned + " out of " + totalRowCount); + } + List columns = requestedSchema.getColumns(); + List types = requestedSchema.getFields(); + columnReaders = new VectorizedColumnReader[columns.size()]; + for (int i = 0; i < columns.size(); ++i) { + columnReaders[i] = + new VectorizedColumnReader(columns.get(i), pages.getPageReader(columns.get(i)), + skipTimestampConversion, types.get(i)); + } + totalCountLoadedSoFar += pages.getRowCount(); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java new file mode 100644 index 0000000..276ff19 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/TestVectorizedColumnReader.java @@ -0,0 +1,429 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.parquet; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.parquet.vector.VectorizedParquetRecordReader; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapreduce.Job; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetInputFormat; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.assertFalse; +import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0; +import static org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP; +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertEquals; + +public class TestVectorizedColumnReader { + + private static final int nElements = 2500; + protected static final Configuration conf = new Configuration(); + protected static final Path file = + new Path("target/test/TestParquetVectorReader/testParquetFile"); + private static String[] uniqueStrs = new String[nElements]; + private static boolean[] isNulls = new boolean[nElements]; + private static Random random = new Random(); + protected static final MessageType schema = parseMessageType( + "message test { " + + "required int32 int32_field; " + + "required int64 int64_field; " + + "required int96 int96_field; " + + "required double double_field; " + + "required float float_field; " + + "required boolean boolean_field; " + + "required fixed_len_byte_array(3) flba_field; " + + "optional fixed_len_byte_array(1) some_null_field; " + + "optional fixed_len_byte_array(1) all_null_field; " + + "optional binary binary_field; " + + "optional binary binary_field_non_repeating; " + + "} "); + + @AfterClass + public static void cleanup() throws IOException { + FileSystem fs = file.getFileSystem(conf); + if (fs.exists(file)) { + fs.delete(file, true); + } + } + + @BeforeClass + public static void prepareFile() throws IOException { + cleanup(); + + boolean dictionaryEnabled = true; + boolean validating = false; + GroupWriteSupport.setSchema(schema, conf); + SimpleGroupFactory f = new SimpleGroupFactory(schema); + ParquetWriter writer = new ParquetWriter( + file, + new GroupWriteSupport(), + GZIP, 1024*1024, 1024, 1024*1024, + dictionaryEnabled, validating, PARQUET_1_0, conf); + writeData(f, writer); + } + + protected static void writeData(SimpleGroupFactory f, ParquetWriter writer) throws IOException { + initialStrings(uniqueStrs); + for (int i = 0; i < nElements; i++) { + Group group = f.newGroup() + .append("int32_field", i) + .append("int64_field", (long) 2 * i) + .append("int96_field", Binary.fromReusedByteArray("999999999999".getBytes())) + .append("double_field", i * 1.0) + .append("float_field", ((float) (i * 2.0))) + .append("boolean_field", i % 5 == 0) + .append("flba_field", "abc"); + + if (i % 2 == 1) { + group.append("some_null_field", "x"); + } + + if (i % 13 != 1) { + int binaryLen = i % 10; + group.append("binary_field", + Binary.fromString(new String(new char[binaryLen]).replace("\0", "x"))); + } + + if (uniqueStrs[i] != null) { + group.append("binary_field_non_repeating", Binary.fromString(uniqueStrs[i])); + } + writer.write(group); + } + writer.close(); + } + + private static String getRandomStr() { + int len = random.nextInt(10); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < len; i++) { + sb.append((char) ('a' + random.nextInt(25))); + } + return sb.toString(); + } + + public static void initialStrings(String[] uniqueStrs) { + for (int i = 0; i < uniqueStrs.length; i++) { + String str = getRandomStr(); + if (!str.isEmpty()) { + uniqueStrs[i] = str; + isNulls[i] = false; + }else{ + isNulls[i] = true; + } + } + } + + private VectorizedParquetRecordReader createParquetReader(String schemaString, Configuration conf) + throws IOException, InterruptedException, HiveException { + conf.set(PARQUET_READ_SCHEMA, schemaString); + HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, true); + HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); + + Job vectorJob = new Job(conf, "read vector"); + ParquetInputFormat.setInputPaths(vectorJob, file); + ParquetInputFormat parquetInputFormat = new ParquetInputFormat(GroupReadSupport.class); + InputSplit split = (InputSplit) parquetInputFormat.getSplits(vectorJob).get(0); + initialVectorizedRowBatchCtx(conf); + return new VectorizedParquetRecordReader(split, new JobConf(conf)); + } + + private void initialVectorizedRowBatchCtx(Configuration conf) throws HiveException { + MapWork mapWork = new MapWork(); + VectorizedRowBatchCtx rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(createStructObjectInspector(conf), new String[0]); + mapWork.setVectorMode(true); + mapWork.setVectorizedRowBatchCtx(rbCtx); + Utilities.setMapWork(conf, mapWork); + } + + private StructObjectInspector createStructObjectInspector(Configuration conf) { + // Create row related objects + String columnNames = conf.get(IOConstants.COLUMNS); + List columnNamesList = DataWritableReadSupport.getColumnNames(columnNames); + String columnTypes = conf.get(IOConstants.COLUMNS_TYPES); + List columnTypesList = DataWritableReadSupport.getColumnTypes(columnTypes); + TypeInfo rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList); + return new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + } + + @Test + public void testIntRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"int32_field"); + conf.set(IOConstants.COLUMNS_TYPES,"int"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int32 int32_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals(c, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testLongRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"int64_field"); + conf.set(IOConstants.COLUMNS_TYPES, "bigint"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required int64 int64_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals(2 * c, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testDoubleRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"double_field"); + conf.set(IOConstants.COLUMNS_TYPES, "double"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required double double_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals(1.0 * c, vector.vector[i], 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testFloatRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"float_field"); + conf.set(IOConstants.COLUMNS_TYPES, "float"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required float float_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + DoubleColumnVector vector = (DoubleColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + assertEquals((float)2.0 * c, vector.vector[i], 0); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testBooleanRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"boolean_field"); + conf.set(IOConstants.COLUMNS_TYPES, "boolean"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required boolean boolean_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + try { + long c = 0; + while (reader.next(NullWritable.get(), previous)) { + LongColumnVector vector = (LongColumnVector) previous.cols[0]; + assertTrue(vector.noNulls); + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + int e = (c % 5 == 0) ? 1 : 0; + assertEquals(e, vector.vector[i]); + assertFalse(vector.isNull[i]); + c++; + } + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testBinaryReadDictionaryEncoding() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"binary_field"); + conf.set(IOConstants.COLUMNS_TYPES, "string"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required binary binary_field;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; + boolean noNull = true; + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + if (c % 13 == 1) { + assertTrue(vector.isNull[i]); + } else { + assertFalse(vector.isNull[i]); + int binaryLen = c % 10; + String expected = new String(new char[binaryLen]).replace("\0", "x"); + String actual = new String(ArrayUtils + .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); + assertEquals("Failed at " + c, expected, actual); + noNull = false; + } + c++; + } + assertEquals("No Null check failed at " + c, noNull, vector.noNulls); + assertFalse(vector.isRepeating); + } + assertEquals(nElements, c); + } finally { + reader.close(); + } + } + + @Test + public void testBinaryRead() throws Exception { + Configuration conf = new Configuration(); + conf.set(IOConstants.COLUMNS,"binary_field_non_repeating"); + conf.set(IOConstants.COLUMNS_TYPES, "string"); + conf.setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false); + conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "0"); + VectorizedParquetRecordReader reader = + createParquetReader("message test { required binary binary_field_non_repeating;}", conf); + VectorizedRowBatch previous = reader.createValue(); + int c = 0; + try { + while (reader.next(NullWritable.get(), previous)) { + BytesColumnVector vector = (BytesColumnVector) previous.cols[0]; + boolean noNull = true; + for (int i = 0; i < vector.vector.length; i++) { + if(c == nElements){ + break; + } + String actual; + assertEquals("Null assert failed at " + c, isNulls[c], vector.isNull[i]); + if (!vector.isNull[i]) { + actual = new String(ArrayUtils + .subarray(vector.vector[i], vector.start[i], vector.start[i] + vector.length[i])); + assertEquals("failed at " + c, uniqueStrs[c], actual); + }else{ + noNull = false; + } + c++; + } + assertEquals("No Null check failed at " + c, noNull, vector.noNulls); + assertFalse(vector.isRepeating); + } + assertEquals("It doesn't exit at expected position", nElements, c); + } finally { + reader.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q new file mode 100644 index 0000000..7de444f --- /dev/null +++ b/ql/src/test/queries/clientpositive/parquet_types_non_dictionary_encoding_vectorization.q @@ -0,0 +1,94 @@ +set hive.mapred.mode=nonstrict; +DROP TABLE parquet_types_staging; +DROP TABLE parquet_types; + +set hive.vectorized.execution.enabled=true; +set hive.vectorized.execution.reduce.enabled=true; +set hive.vectorized.use.row.serde.deserialize=true; +set hive.vectorized.use.vector.serde.deserialize=true; +set hive.vectorized.execution.reduce.groupby.enabled = true; + +CREATE TABLE parquet_types_staging ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary string, + m1 map, + l1 array, + st1 struct, + d date +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':'; + +CREATE TABLE parquet_types ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary binary, + m1 map, + l1 array, + st1 struct, + d date +) STORED AS PARQUET; + +LOAD DATA LOCAL INPATH '../../data/files/parquet_non_dictionary_types.txt' OVERWRITE INTO TABLE +parquet_types_staging; + +SELECT * FROM parquet_types_staging; + +INSERT OVERWRITE TABLE parquet_types +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging; + +-- test types in group by + +EXPLAIN SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +; + +SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +; + +EXPLAIN SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat; +SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat; + +EXPLAIN SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar; +SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar; + +EXPLAIN SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar; +SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar; + +EXPLAIN SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1; +SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1; + +EXPLAIN SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; +SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/queries/clientpositive/parquet_types_vectorization.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/parquet_types_vectorization.q b/ql/src/test/queries/clientpositive/parquet_types_vectorization.q new file mode 100644 index 0000000..bb0e5b2 --- /dev/null +++ b/ql/src/test/queries/clientpositive/parquet_types_vectorization.q @@ -0,0 +1,96 @@ +set hive.mapred.mode=nonstrict; +DROP TABLE parquet_types_staging; +DROP TABLE parquet_types; + +set hive.vectorized.execution.enabled=true; +set hive.vectorized.execution.reduce.enabled=true; +set hive.vectorized.use.row.serde.deserialize=true; +set hive.vectorized.use.vector.serde.deserialize=true; +set hive.vectorized.execution.reduce.groupby.enabled = true; + +CREATE TABLE parquet_types_staging ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary string, + m1 map, + l1 array, + st1 struct, + d date +) ROW FORMAT DELIMITED +FIELDS TERMINATED BY '|' +COLLECTION ITEMS TERMINATED BY ',' +MAP KEYS TERMINATED BY ':'; + +CREATE TABLE parquet_types ( + cint int, + ctinyint tinyint, + csmallint smallint, + cfloat float, + cdouble double, + cstring1 string, + t timestamp, + cchar char(5), + cvarchar varchar(10), + cbinary binary, + m1 map, + l1 array, + st1 struct, + d date +) STORED AS PARQUET; + +LOAD DATA LOCAL INPATH '../../data/files/parquet_types.txt' OVERWRITE INTO TABLE parquet_types_staging; + +SELECT * FROM parquet_types_staging; + +INSERT OVERWRITE TABLE parquet_types +SELECT cint, ctinyint, csmallint, cfloat, cdouble, cstring1, t, cchar, cvarchar, +unhex(cbinary), m1, l1, st1, d FROM parquet_types_staging; + +-- test types in group by + +EXPLAIN SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +; + +SELECT ctinyint, + MAX(cint), + MIN(csmallint), + COUNT(cstring1), + ROUND(AVG(cfloat), 5), + ROUND(STDDEV_POP(cdouble),5) +FROM parquet_types +GROUP BY ctinyint +ORDER BY ctinyint +; + +EXPLAIN SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat; +SELECT cfloat, count(*) FROM parquet_types GROUP BY cfloat ORDER BY cfloat; + +EXPLAIN SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar; +SELECT cchar, count(*) FROM parquet_types GROUP BY cchar ORDER BY cchar; + +EXPLAIN SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar; +SELECT cvarchar, count(*) FROM parquet_types GROUP BY cvarchar ORDER BY cvarchar; + +EXPLAIN SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1; +SELECT cstring1, count(*) FROM parquet_types GROUP BY cstring1 ORDER BY cstring1; + +EXPLAIN SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t; +SELECT t, count(*) FROM parquet_types GROUP BY t ORDER BY t; + +EXPLAIN SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; +SELECT hex(cbinary), count(*) FROM parquet_types GROUP BY cbinary; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out b/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out index 8345132..e42453d 100644 --- a/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out +++ b/ql/src/test/results/clientpositive/llap/vectorized_parquet.q.out @@ -150,7 +150,7 @@ STAGE PLANS: Map-reduce partition columns: _col0 (type: tinyint) Statistics: Num rows: 12288 Data size: 73728 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: int), _col2 (type: smallint), _col3 (type: bigint), _col4 (type: struct), _col5 (type: struct) - Execution mode: llap + Execution mode: vectorized, llap LLAP IO: no inputs Reducer 2 Execution mode: llap http://git-wip-us.apache.org/repos/asf/hive/blob/936df7a1/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out b/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out index b49d5dd..0524cb3 100644 --- a/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out +++ b/ql/src/test/results/clientpositive/llap/vectorized_parquet_types.q.out @@ -250,19 +250,19 @@ Stage-0 limit:-1 Stage-1 Reducer 3 vectorized, llap - File Output Operator [FS_10] - Select Operator [SEL_9] (rows=11 width=11) + File Output Operator [FS_12] + Select Operator [SEL_11] (rows=11 width=11) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"] <-Reducer 2 [SIMPLE_EDGE] llap SHUFFLE [RS_6] Group By Operator [GBY_4] (rows=11 width=11) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"],aggregations:["max(VALUE._col0)","min(VALUE._col1)","count(VALUE._col2)","avg(VALUE._col3)","stddev_pop(VALUE._col4)","max(VALUE._col5)"],keys:KEY._col0 - <-Map 1 [SIMPLE_EDGE] llap + <-Map 1 [SIMPLE_EDGE] vectorized, llap SHUFFLE [RS_3] PartitionCols:_col0 - Group By Operator [GBY_2] (rows=22 width=11) + Group By Operator [GBY_10] (rows=22 width=11) Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6"],aggregations:["max(cint)","min(csmallint)","count(cstring1)","avg(cfloat)","stddev_pop(cdouble)","max(cdecimal)"],keys:ctinyint - Select Operator [SEL_1] (rows=22 width=11) + Select Operator [SEL_9] (rows=22 width=11) Output:["ctinyint","cint","csmallint","cstring1","cfloat","cdouble","cdecimal"] TableScan [TS_0] (rows=22 width=11) default@parquet_types,parquet_types,Tbl:COMPLETE,Col:NONE,Output:["cint","ctinyint","csmallint","cfloat","cdouble","cstring1","cdecimal"]