Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D2F15200BC3 for ; Fri, 18 Nov 2016 20:07:37 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D18CE160B03; Fri, 18 Nov 2016 19:07:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B1826160B19 for ; Fri, 18 Nov 2016 20:07:35 +0100 (CET) Received: (qmail 88247 invoked by uid 500); 18 Nov 2016 19:07:34 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 88126 invoked by uid 99); 18 Nov 2016 19:07:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Nov 2016 19:07:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 75E89EF9A9; Fri, 18 Nov 2016 19:07:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Fri, 18 Nov 2016 19:07:36 -0000 Message-Id: In-Reply-To: <6a20e2e775ed4f669d0e8fb1d8337da0@git.apache.org> References: <6a20e2e775ed4f669d0e8fb1d8337da0@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] hive git commit: HIVE-14089 : complex type support in LLAP IO is broken (Sergey Shelukhin, reviewed by Prasanth Jayachandran) archived-at: Fri, 18 Nov 2016 19:07:38 -0000 http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java index ebbdf8d..d5f5f9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedTreeReaderFactory.java @@ -17,18 +17,26 @@ */ package org.apache.hadoop.hive.ql.io.orc.encoded; +import org.apache.orc.impl.RunLengthByteReader; +import org.apache.orc.impl.StreamName; + import java.io.IOException; import java.util.List; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch; import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData; import org.apache.orc.CompressionCodec; +import org.apache.orc.TypeDescription; +import org.apache.orc.TypeDescription.Category; import org.apache.orc.impl.PositionProvider; import org.apache.orc.impl.SettableUncompressedStream; import org.apache.orc.impl.TreeReaderFactory; import org.apache.orc.OrcProto; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EncodedTreeReaderFactory extends TreeReaderFactory { + private static final Logger LOG = LoggerFactory.getLogger(EncodedTreeReaderFactory.class); /** * We choose to use a toy programming language, so we cannot use multiple inheritance. * If we could, we could have this inherit TreeReader to contain the common impl, and then @@ -36,7 +44,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { * Instead, we have a settable interface that the caller will cast to and call setBuffers. */ public interface SettableTreeReader { - void setBuffers(ColumnStreamData[] streamBuffers, boolean sameStripe) throws IOException; + void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException; } public static class TimestampStreamReader extends TimestampTreeReader @@ -84,8 +92,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -198,6 +207,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override + public void seek(PositionProvider[] index) throws IOException { + // This string reader should simply redirect to its own seek (what other types already do). + this.seek(index[columnId]); + } + + @Override public void seek(PositionProvider index) throws IOException { if (present != null) { if (_isFileCompressed) { @@ -211,7 +226,7 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. - if (_dataStream.available() > 0) { + if (_dataStream != null && _dataStream.available() > 0) { if (_isFileCompressed) { index.getNext(); } @@ -222,14 +237,14 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { // data stream could be empty stream or already reached end of stream before present stream. // This can happen if all values in stream are nulls or last row group values are all null. - if (_dataStream.available() > 0) { + if (_dataStream != null && _dataStream.available() > 0) { if (_isFileCompressed) { index.getNext(); } ((StringDirectTreeReader) reader).getStream().seek(index); } - if (_lengthStream.available() > 0) { + if (_lengthStream != null && _lengthStream.available() > 0) { if (_isFileCompressed) { index.getNext(); } @@ -239,8 +254,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -371,8 +387,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -468,8 +485,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -571,8 +589,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -668,8 +687,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -758,8 +778,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -859,8 +880,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -979,8 +1001,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -1065,6 +1088,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override + public void seek(PositionProvider[] index) throws IOException { + // This string reader should simply redirect to its own seek (what other types already do). + this.seek(index[columnId]); + } + + @Override public void seek(PositionProvider index) throws IOException { if (present != null) { if (_isFileCompressed) { @@ -1106,8 +1135,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -1233,6 +1263,12 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override + public void seek(PositionProvider[] index) throws IOException { + // This string reader should simply redirect to its own seek (what other types already do). + this.seek(index[columnId]); + } + + @Override public void seek(PositionProvider index) throws IOException { if (present != null) { if (_isFileCompressed) { @@ -1274,8 +1310,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -1411,8 +1448,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -1511,8 +1549,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -1617,8 +1656,9 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } @Override - public void setBuffers(ColumnStreamData[] streamsData, boolean sameStripe) + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); if (_presentStream != null) { _presentStream.setBuffers(StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); } @@ -1673,177 +1713,760 @@ public class EncodedTreeReaderFactory extends TreeReaderFactory { } } - public static TreeReader[] createEncodedTreeReader(int numCols, - List types, - List encodings, - EncodedColumnBatch batch, - CompressionCodec codec, boolean skipCorrupt, - String writerTimezone) throws IOException { - TreeReader[] treeReaders = new TreeReader[numCols]; - for (int i = 0; i < numCols; i++) { - int columnIndex = batch.getColumnIxs()[i]; - ColumnStreamData[] streamBuffers = batch.getColumnData()[i]; - OrcProto.Type columnType = types.get(columnIndex); - - // EncodedColumnBatch is already decompressed, we don't really need to pass codec. - // But we need to know if the original data is compressed or not. This is used to skip - // positions in row index properly. If the file is originally compressed, - // then 1st position (compressed offset) in row index should be skipped to get - // uncompressed offset, else 1st position should not be skipped. - // TODO: there should be a better way to do this, code just needs to be modified - OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex); - - // stream buffers are arranged in enum order of stream kind - ColumnStreamData present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE], - data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE], - dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE], - lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE], - secondary = streamBuffers[OrcProto.Stream.Kind.SECONDARY_VALUE]; - - switch (columnType.getKind()) { - case BINARY: - treeReaders[i] = BinaryStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setLengthStream(lengths) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; - case BOOLEAN: - treeReaders[i] = BooleanStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .build(); - break; - case BYTE: - treeReaders[i] = ByteStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .build(); - break; - case SHORT: - treeReaders[i] = ShortStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; - case INT: - treeReaders[i] = IntStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; - case LONG: - treeReaders[i] = LongStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .skipCorrupt(skipCorrupt) - .build(); - break; - case FLOAT: - treeReaders[i] = FloatStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .build(); - break; - case DOUBLE: - treeReaders[i] = DoubleStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setCompressionCodec(codec) - .build(); - break; - case CHAR: - treeReaders[i] = CharStreamReader.builder() - .setColumnIndex(columnIndex) - .setMaxLength(columnType.getMaximumLength()) - .setPresentStream(present) - .setDataStream(data) - .setLengthStream(lengths) - .setDictionaryStream(dictionary) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; - case VARCHAR: - treeReaders[i] = VarcharStreamReader.builder() - .setColumnIndex(columnIndex) - .setMaxLength(columnType.getMaximumLength()) - .setPresentStream(present) - .setDataStream(data) - .setLengthStream(lengths) - .setDictionaryStream(dictionary) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; - case STRING: - treeReaders[i] = StringStreamReader.builder() - .setColumnIndex(columnIndex) - .setPresentStream(present) - .setDataStream(data) - .setLengthStream(lengths) - .setDictionaryStream(dictionary) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; - case DECIMAL: - treeReaders[i] = DecimalStreamReader.builder() - .setColumnIndex(columnIndex) - .setPrecision(columnType.getPrecision()) - .setScale(columnType.getScale()) - .setPresentStream(present) - .setValueStream(data) - .setScaleStream(secondary) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) - .build(); - break; - case TIMESTAMP: - treeReaders[i] = TimestampStreamReader.builder() + public static StructTreeReader createRootTreeReader(TypeDescription schema, + List encodings, EncodedColumnBatch batch, + CompressionCodec codec, boolean skipCorrupt, String tz, int[] columnMapping) + throws IOException { + if (schema.getCategory() != Category.STRUCT) { + throw new AssertionError("Schema is not a struct: " + schema); + } + // Some child types may be excluded. Note that this can only happen at root level. + List children = schema.getChildren(); + int childCount = children.size(), includedCount = 0; + for (int childIx = 0; childIx < childCount; ++childIx) { + if (!batch.hasData(children.get(childIx).getId())) { + if (LOG.isDebugEnabled()) { + LOG.debug("Column at " + childIx + " " + children.get(childIx).getId() + + ":" + children.get(childIx).toString() + " has no data"); + } + continue; + } + ++includedCount; + } + TreeReader[] childReaders = new TreeReader[includedCount]; + for (int schemaChildIx = 0, inclChildIx = -1; schemaChildIx < childCount; ++schemaChildIx) { + if (!batch.hasData(children.get(schemaChildIx).getId())) continue; + childReaders[++inclChildIx] = createEncodedTreeReader( + schema.getChildren().get(schemaChildIx), encodings, batch, codec, skipCorrupt, tz); + columnMapping[inclChildIx] = schemaChildIx; + } + return StructStreamReader.builder() + .setColumnIndex(0) + .setCompressionCodec(codec) + .setColumnEncoding(encodings.get(0)) + .setChildReaders(childReaders) + .build(); + } + + + private static TreeReader createEncodedTreeReader(TypeDescription schema, + List encodings, EncodedColumnBatch batch, + CompressionCodec codec, boolean skipCorrupt, String tz) throws IOException { + int columnIndex = schema.getId(); + ColumnStreamData[] streamBuffers = batch.getColumnData(columnIndex); + + // EncodedColumnBatch is already decompressed, we don't really need to pass codec. + // But we need to know if the original data is compressed or not. This is used to skip + // positions in row index properly. If the file is originally compressed, + // then 1st position (compressed offset) in row index should be skipped to get + // uncompressed offset, else 1st position should not be skipped. + // TODO: there should be a better way to do this, code just needs to be modified + OrcProto.ColumnEncoding columnEncoding = encodings.get(columnIndex); + + // stream buffers are arranged in enum order of stream kind + ColumnStreamData present = streamBuffers[OrcProto.Stream.Kind.PRESENT_VALUE], + data = streamBuffers[OrcProto.Stream.Kind.DATA_VALUE], + dictionary = streamBuffers[OrcProto.Stream.Kind.DICTIONARY_DATA_VALUE], + lengths = streamBuffers[OrcProto.Stream.Kind.LENGTH_VALUE], + secondary = streamBuffers[OrcProto.Stream.Kind.SECONDARY_VALUE]; + + + if (LOG.isDebugEnabled()) { + LOG.debug("columnIndex: {} columnType: {} streamBuffers.length: {} columnEncoding: {}" + + " present: {} data: {} dictionary: {} lengths: {} secondary: {} tz: {}", + columnIndex, schema, streamBuffers.length, columnEncoding, present != null, + data != null, dictionary != null, lengths != null, secondary != null, tz); + } + switch (schema.getCategory()) { + case BINARY: + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case CHAR: + case VARCHAR: + case STRING: + case DECIMAL: + case TIMESTAMP: + case DATE: + return getPrimitiveTreeReaders(columnIndex, schema, codec, columnEncoding, + present, data, dictionary, lengths, secondary, skipCorrupt, tz); + case LIST: + TypeDescription elementType = schema.getChildren().get(0); + TreeReader elementReader = createEncodedTreeReader( + elementType, encodings, batch, codec, skipCorrupt, tz); + return ListStreamReader.builder() + .setColumnIndex(columnIndex) + .setColumnEncoding(columnEncoding) + .setCompressionCodec(codec) + .setPresentStream(present) + .setLengthStream(lengths) + .setElementReader(elementReader) + .build(); + case MAP: + TypeDescription keyType = schema.getChildren().get(0); + TypeDescription valueType = schema.getChildren().get(1); + TreeReader keyReader = createEncodedTreeReader( + keyType, encodings, batch, codec, skipCorrupt, tz); + TreeReader valueReader = createEncodedTreeReader( + valueType, encodings, batch, codec, skipCorrupt, tz); + return MapStreamReader.builder() + .setColumnIndex(columnIndex) + .setColumnEncoding(columnEncoding) + .setCompressionCodec(codec) + .setPresentStream(present) + .setLengthStream(lengths) + .setKeyReader(keyReader) + .setValueReader(valueReader) + .build(); + case STRUCT: { + int childCount = schema.getChildren().size(); + TreeReader[] childReaders = new TreeReader[childCount]; + for (int i = 0; i < childCount; i++) { + TypeDescription childType = schema.getChildren().get(i); + childReaders[i] = createEncodedTreeReader( + childType, encodings, batch, codec, skipCorrupt, tz); + } + return StructStreamReader.builder() + .setColumnIndex(columnIndex) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .setPresentStream(present) + .setChildReaders(childReaders) + .build(); + } + case UNION: { + int childCount = schema.getChildren().size(); + TreeReader[] childReaders = new TreeReader[childCount]; + for (int i = 0; i < childCount; i++) { + TypeDescription childType = schema.getChildren().get(i); + childReaders[i] = createEncodedTreeReader( + childType, encodings, batch, codec, skipCorrupt, tz); + } + return UnionStreamReader.builder() .setColumnIndex(columnIndex) - .setPresentStream(present) - .setSecondsStream(data) - .setNanosStream(secondary) .setCompressionCodec(codec) .setColumnEncoding(columnEncoding) - .setWriterTimezone(writerTimezone) - .skipCorrupt(skipCorrupt) - .build(); - break; - case DATE: - treeReaders[i] = DateStreamReader.builder() - .setColumnIndex(columnIndex) .setPresentStream(present) .setDataStream(data) - .setCompressionCodec(codec) - .setColumnEncoding(columnEncoding) + .setChildReaders(childReaders) .build(); - break; - default: - throw new UnsupportedOperationException("Data type not supported yet! " + columnType); + } + default: + throw new UnsupportedOperationException("Data type not supported: " + schema); + } + } + + private static TreeReader getPrimitiveTreeReaders(final int columnIndex, + TypeDescription columnType, CompressionCodec codec, OrcProto.ColumnEncoding columnEncoding, + ColumnStreamData present, ColumnStreamData data, ColumnStreamData dictionary, + ColumnStreamData lengths, ColumnStreamData secondary, boolean skipCorrupt, String tz) + throws IOException { + switch (columnType.getCategory()) { + case BINARY: + return BinaryStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setLengthStream(lengths) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case BOOLEAN: + return BooleanStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .build(); + case BYTE: + return ByteStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .build(); + case SHORT: + return ShortStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case INT: + return IntStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case LONG: + return LongStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .skipCorrupt(skipCorrupt) + .build(); + case FLOAT: + return FloatStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .build(); + case DOUBLE: + return DoubleStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .build(); + case CHAR: + return CharStreamReader.builder() + .setColumnIndex(columnIndex) + .setMaxLength(columnType.getMaxLength()) + .setPresentStream(present) + .setDataStream(data) + .setLengthStream(lengths) + .setDictionaryStream(dictionary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case VARCHAR: + return VarcharStreamReader.builder() + .setColumnIndex(columnIndex) + .setMaxLength(columnType.getMaxLength()) + .setPresentStream(present) + .setDataStream(data) + .setLengthStream(lengths) + .setDictionaryStream(dictionary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case STRING: + return StringStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setLengthStream(lengths) + .setDictionaryStream(dictionary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case DECIMAL: + return DecimalStreamReader.builder() + .setColumnIndex(columnIndex) + .setPrecision(columnType.getPrecision()) + .setScale(columnType.getScale()) + .setPresentStream(present) + .setValueStream(data) + .setScaleStream(secondary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + case TIMESTAMP: + return TimestampStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setSecondsStream(data) + .setNanosStream(secondary) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .setWriterTimezone(tz) + .skipCorrupt(skipCorrupt) + .build(); + case DATE: + return DateStreamReader.builder() + .setColumnIndex(columnIndex) + .setPresentStream(present) + .setDataStream(data) + .setCompressionCodec(codec) + .setColumnEncoding(columnEncoding) + .build(); + default: + throw new AssertionError("Not a primitive category: " + columnType.getCategory()); + } + } + + protected static class ListStreamReader extends ListTreeReader implements SettableTreeReader { + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _lengthStream; + + public ListStreamReader(final int columnIndex, + final SettableUncompressedStream present, final SettableUncompressedStream lengthStream, + final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed, + final TreeReader elementReader) throws IOException { + super(columnIndex, present, lengthStream, columnEncoding, elementReader); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._lengthStream = lengthStream; + } + + @Override + public void seek(PositionProvider[] index) throws IOException { + PositionProvider ownIndex = index[columnId]; + if (present != null) { + if (_isFileCompressed) { + ownIndex.getNext(); + } + present.seek(ownIndex); + } + + // lengths stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_lengthStream.available() > 0) { + if (_isFileCompressed) { + ownIndex.getNext(); + } + lengths.seek(ownIndex); + elementReader.seek(index); } } - return treeReaders; + @Override + public void seek(PositionProvider index) throws IOException { + // Only our parent class can call this. + throw new IOException("Should never be called"); + } + + @Override + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) + throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); + if (_presentStream != null) { + _presentStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); + } + if (_lengthStream != null) { + _lengthStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.LENGTH_VALUE])); + } + + if (elementReader != null) { + ((SettableTreeReader) elementReader).setBuffers(batch, sameStripe); + } + } + + public static class StreamReaderBuilder { + private int columnIndex; + private ColumnStreamData presentStream; + private ColumnStreamData lengthStream; + private CompressionCodec compressionCodec; + private OrcProto.ColumnEncoding columnEncoding; + private TreeReader elementReader; + + + public ListStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) { + this.columnIndex = columnIndex; + return this; + } + + public ListStreamReader.StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) { + this.lengthStream = lengthStream; + return this; + } + + public ListStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) { + this.presentStream = presentStream; + return this; + } + + public ListStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { + this.columnEncoding = encoding; + return this; + } + + public ListStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + return this; + } + + public ListStreamReader.StreamReaderBuilder setElementReader(TreeReader elementReader) { + this.elementReader = elementReader; + return this; + } + + public ListStreamReader build() throws IOException { + SettableUncompressedStream present = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(), + presentStream); + + SettableUncompressedStream length = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(), + lengthStream); + + boolean isFileCompressed = compressionCodec != null; + return new ListStreamReader(columnIndex, present, length, columnEncoding, isFileCompressed, + elementReader); + } + } + + public static ListStreamReader.StreamReaderBuilder builder() { + return new ListStreamReader.StreamReaderBuilder(); + } + } + + protected static class MapStreamReader extends MapTreeReader implements SettableTreeReader{ + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _lengthStream; + + public MapStreamReader(final int columnIndex, + final SettableUncompressedStream present, final SettableUncompressedStream lengthStream, + final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed, + final TreeReader keyReader, final TreeReader valueReader) throws IOException { + super(columnIndex, present, lengthStream, columnEncoding, keyReader, valueReader); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._lengthStream = lengthStream; + } + + @Override + public void seek(PositionProvider[] index) throws IOException { + // We are not calling super.seek since we handle the present stream differently. + PositionProvider ownIndex = index[columnId]; + if (present != null) { + if (_isFileCompressed) { + ownIndex.getNext(); + } + present.seek(ownIndex); + } + + // lengths stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_lengthStream.available() > 0) { + if (_isFileCompressed) { + ownIndex.getNext(); + } + lengths.seek(ownIndex); + keyReader.seek(index); + valueReader.seek(index); + } + } + + @Override + public void seek(PositionProvider index) throws IOException { + // Only our parent class can call this. + throw new IOException("Should never be called"); + } + + @Override + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) + throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); + if (_presentStream != null) { + _presentStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); + } + if (_lengthStream != null) { + _lengthStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.LENGTH_VALUE])); + } + + if (keyReader != null) { + ((SettableTreeReader) keyReader).setBuffers(batch, sameStripe); + } + + if (valueReader != null) { + ((SettableTreeReader) valueReader).setBuffers(batch, sameStripe); + } + } + + public static class StreamReaderBuilder { + private int columnIndex; + private ColumnStreamData presentStream; + private ColumnStreamData lengthStream; + private CompressionCodec compressionCodec; + private OrcProto.ColumnEncoding columnEncoding; + private TreeReader keyReader; + private TreeReader valueReader; + + + public MapStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) { + this.columnIndex = columnIndex; + return this; + } + + public MapStreamReader.StreamReaderBuilder setLengthStream(ColumnStreamData lengthStream) { + this.lengthStream = lengthStream; + return this; + } + + public MapStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) { + this.presentStream = presentStream; + return this; + } + + public MapStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { + this.columnEncoding = encoding; + return this; + } + + public MapStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + return this; + } + + public MapStreamReader.StreamReaderBuilder setKeyReader(TreeReader keyReader) { + this.keyReader = keyReader; + return this; + } + + public MapStreamReader.StreamReaderBuilder setValueReader(TreeReader valueReader) { + this.valueReader = valueReader; + return this; + } + + public MapStreamReader build() throws IOException { + SettableUncompressedStream present = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(), + presentStream); + + SettableUncompressedStream length = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.LENGTH.name(), + lengthStream); + + boolean isFileCompressed = compressionCodec != null; + return new MapStreamReader(columnIndex, present, length, columnEncoding, isFileCompressed, + keyReader, valueReader); + } + } + + public static MapStreamReader.StreamReaderBuilder builder() { + return new MapStreamReader.StreamReaderBuilder(); + } + } + + protected static class StructStreamReader extends StructTreeReader + implements SettableTreeReader { + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + + public StructStreamReader(final int columnIndex, + final SettableUncompressedStream present, + final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed, + final TreeReader[] childReaders) throws IOException { + super(columnIndex, present, columnEncoding, childReaders); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + } + + @Override + public void seek(PositionProvider[] index) throws IOException { + PositionProvider ownIndex = index[columnId]; + if (present != null) { + if (_isFileCompressed) { + ownIndex.getNext(); + } + present.seek(ownIndex); + } + if (fields != null) { + for (TreeReader child : fields) { + child.seek(index); + } + } + } + + @Override + public void seek(PositionProvider index) throws IOException { + // Only our parent class can call this. + throw new IOException("Should never be called"); + } + + + @Override + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) + throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); + if (_presentStream != null) { + _presentStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); + } + if (fields != null) { + for (TreeReader child : fields) { + ((SettableTreeReader) child).setBuffers(batch, sameStripe); + } + } + } + + public static class StreamReaderBuilder { + private int columnIndex; + private ColumnStreamData presentStream; + private CompressionCodec compressionCodec; + private OrcProto.ColumnEncoding columnEncoding; + private TreeReader[] childReaders; + + + public StructStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) { + this.columnIndex = columnIndex; + return this; + } + + public StructStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) { + this.presentStream = presentStream; + return this; + } + + public StructStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { + this.columnEncoding = encoding; + return this; + } + + public StructStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + return this; + } + + public StructStreamReader.StreamReaderBuilder setChildReaders(TreeReader[] childReaders) { + this.childReaders = childReaders; + return this; + } + + public StructStreamReader build() throws IOException { + SettableUncompressedStream present = StreamUtils + .createSettableUncompressedStream(OrcProto.Stream.Kind.PRESENT.name(), + presentStream); + + boolean isFileCompressed = compressionCodec != null; + return new StructStreamReader(columnIndex, present, columnEncoding, isFileCompressed, + childReaders); + } + } + + public static StructStreamReader.StreamReaderBuilder builder() { + return new StructStreamReader.StreamReaderBuilder(); + } + } + + protected static class UnionStreamReader extends UnionTreeReader implements SettableTreeReader { + private boolean _isFileCompressed; + private SettableUncompressedStream _presentStream; + private SettableUncompressedStream _dataStream; + + public UnionStreamReader(final int columnIndex, + final SettableUncompressedStream present, final SettableUncompressedStream dataStream, + final OrcProto.ColumnEncoding columnEncoding, final boolean isFileCompressed, + final TreeReader[] childReaders) throws IOException { + super(columnIndex, present, columnEncoding, childReaders); + this._isFileCompressed = isFileCompressed; + this._presentStream = present; + this._dataStream = dataStream; + // Note: other parent readers init everything in ctor, but union does it in startStripe. + this.tags = new RunLengthByteReader(dataStream); + } + + @Override + public void seek(PositionProvider[] index) throws IOException { + PositionProvider ownIndex = index[columnId]; + if (present != null) { + if (_isFileCompressed) { + ownIndex.getNext(); + } + present.seek(ownIndex); + } + + // lengths stream could be empty stream or already reached end of stream before present stream. + // This can happen if all values in stream are nulls or last row group values are all null. + if (_dataStream.available() > 0) { + if (_isFileCompressed) { + ownIndex.getNext(); + } + tags.seek(ownIndex); + if (fields != null) { + for (TreeReader child : fields) { + child.seek(index); + } + } + } + } + + @Override + public void seek(PositionProvider index) throws IOException { + // Only our parent class can call this. + throw new IOException("Should never be called"); + } + + @Override + public void setBuffers(EncodedColumnBatch batch, boolean sameStripe) + throws IOException { + ColumnStreamData[] streamsData = batch.getColumnData(columnId); + if (_presentStream != null) { + _presentStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.PRESENT_VALUE])); + } + if (_dataStream != null) { + _dataStream.setBuffers( + StreamUtils.createDiskRangeInfo(streamsData[OrcProto.Stream.Kind.DATA_VALUE])); + } + if (fields != null) { + for (TreeReader child : fields) { + ((SettableTreeReader) child).setBuffers(batch, sameStripe); + } + } + } + + public static class StreamReaderBuilder { + private int columnIndex; + private ColumnStreamData presentStream; + private ColumnStreamData dataStream; + private CompressionCodec compressionCodec; + private OrcProto.ColumnEncoding columnEncoding; + private TreeReader[] childReaders; + + + public UnionStreamReader.StreamReaderBuilder setColumnIndex(int columnIndex) { + this.columnIndex = columnIndex; + return this; + } + + public UnionStreamReader.StreamReaderBuilder setDataStream(ColumnStreamData dataStream) { + this.dataStream = dataStream; + return this; + } + + public UnionStreamReader.StreamReaderBuilder setPresentStream(ColumnStreamData presentStream) { + this.presentStream = presentStream; + return this; + } + + public UnionStreamReader.StreamReaderBuilder setColumnEncoding(OrcProto.ColumnEncoding encoding) { + this.columnEncoding = encoding; + return this; + } + + public UnionStreamReader.StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + return this; + } + + public UnionStreamReader.StreamReaderBuilder setChildReaders(TreeReader[] childReaders) { + this.childReaders = childReaders; + return this; + } + + public UnionStreamReader build() throws IOException { + SettableUncompressedStream present = StreamUtils.createSettableUncompressedStream( + OrcProto.Stream.Kind.PRESENT.name(), presentStream); + + SettableUncompressedStream data = StreamUtils.createSettableUncompressedStream( + OrcProto.Stream.Kind.DATA.name(), dataStream); + + boolean isFileCompressed = compressionCodec != null; + return new UnionStreamReader(columnIndex, present, data, + columnEncoding, isFileCompressed, childReaders); + } + } + + public static UnionStreamReader.StreamReaderBuilder builder() { + return new UnionStreamReader.StreamReaderBuilder(); + } } } http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java index 4405232..1c5f0e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/Reader.java @@ -46,7 +46,7 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader { public static final int ALL_RGS = -1; /** * All the previous streams are data streams, this and the next ones are index streams. - * We assume the sort will stay the same for backward compat. + * We assume the order will stay the same for backward compat. */ public static final int MAX_DATA_STREAMS = OrcProto.Stream.Kind.ROW_INDEX.getNumber(); public void init(Object fileKey, int stripeIx, int rgIx, int columnCount) { @@ -57,6 +57,10 @@ public interface Reader extends org.apache.hadoop.hive.ql.io.orc.Reader { } resetColumnArrays(columnCount); } + + public void initOrcColumn(int colIx) { + super.initColumn(colIx, MAX_DATA_STREAMS); + } } /** http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index 5cc3663..601324a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -278,30 +278,35 @@ public class MapWork extends BaseWork { } // check if the column types that are read are supported by LLAP IO - for (Map.Entry> entry : aliasToWork.entrySet()) { - if (hasLlap) { - final String alias = entry.getKey(); - Operator op = entry.getValue(); - PartitionDesc partitionDesc = aliasToPartnInfo.get(alias); - if (op instanceof TableScanOperator && partitionDesc != null && - partitionDesc.getTableDesc() != null) { - final TableScanOperator tsOp = (TableScanOperator) op; - final List readColumnNames = tsOp.getNeededColumns(); - final Properties props = partitionDesc.getTableDesc().getProperties(); - final List typeInfos = TypeInfoUtils.getTypeInfosFromTypeString( - props.getProperty(serdeConstants.LIST_COLUMN_TYPES)); - final List allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos); - final List allColumnNames = Utilities.getColumnNames(props); - hasLlap = Utilities.checkLlapIOSupportedTypes(readColumnNames, allColumnNames, - allColumnTypes); - } - } + if (hasLlap) { + // TODO: no need for now hasLlap = checkVectorizerSupportedTypes(); } llapIoDesc = deriveLlapIoDescString( isLlapOn, canWrapAny, hasPathToPartInfo, hasLlap, hasNonLlap, hasAcid); } + private boolean checkVectorizerSupportedTypes(boolean hasLlap) { + for (Map.Entry> entry : aliasToWork.entrySet()) { + final String alias = entry.getKey(); + Operator op = entry.getValue(); + PartitionDesc partitionDesc = aliasToPartnInfo.get(alias); + if (op instanceof TableScanOperator && partitionDesc != null && + partitionDesc.getTableDesc() != null) { + final TableScanOperator tsOp = (TableScanOperator) op; + final List readColumnNames = tsOp.getNeededColumns(); + final Properties props = partitionDesc.getTableDesc().getProperties(); + final List typeInfos = TypeInfoUtils.getTypeInfosFromTypeString( + props.getProperty(serdeConstants.LIST_COLUMN_TYPES)); + final List allColumnTypes = TypeInfoUtils.getTypeStringsFromTypeInfo(typeInfos); + final List allColumnNames = Utilities.getColumnNames(props); + hasLlap = Utilities.checkVectorizerSupportedTypes(readColumnNames, allColumnNames, + allColumnTypes); + } + } + return hasLlap; + } + private static String deriveLlapIoDescString(boolean isLlapOn, boolean canWrapAny, boolean hasPathToPartInfo, boolean hasLlap, boolean hasNonLlap, boolean hasAcid) { if (!isLlapOn) return null; // LLAP IO is off, don't output. http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/test/queries/clientpositive/vector_complex_all.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/vector_complex_all.q b/ql/src/test/queries/clientpositive/vector_complex_all.q index 91a7368..b71ac62 100644 --- a/ql/src/test/queries/clientpositive/vector_complex_all.q +++ b/ql/src/test/queries/clientpositive/vector_complex_all.q @@ -1,9 +1,11 @@ set hive.compute.query.using.stats=false; -set hive.compute.query.using.stats=false; +set hive.strict.checks.cartesian.product=false; set hive.cli.print.header=true; set hive.explain.user=false; set hive.fetch.task.conversion=none; SET hive.vectorized.execution.enabled=true; +set hive.llap.io.enabled=false; +set hive.mapred.mode=nonstrict; CREATE TABLE orc_create_staging ( str STRING, @@ -21,25 +23,45 @@ CREATE TABLE orc_create_complex ( str STRING, mp MAP, lst ARRAY, - strct STRUCT -) STORED AS ORC; + strct STRUCT, + val string +) STORED AS ORC tblproperties("orc.row.index.stride"="1000", "orc.stripe.size"="1000", "orc.compress.size"="10000"); -INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging; +INSERT OVERWRITE TABLE orc_create_complex +SELECT orc_create_staging.*, '0' FROM orc_create_staging; --- Since complex types are not supported, this query should not vectorize. -EXPLAIN -SELECT * FROM orc_create_complex; +set hive.llap.io.enabled=true; SELECT * FROM orc_create_complex; --- However, since this query is not referencing the complex fields, it should vectorize. -EXPLAIN -SELECT COUNT(*) FROM orc_create_complex; +SELECT str FROM orc_create_complex; + +SELECT strct, mp, lst FROM orc_create_complex; + +SELECT lst, str FROM orc_create_complex; + +SELECT mp, str FROM orc_create_complex; + +SELECT strct, str FROM orc_create_complex; + +SELECT strct.B, str FROM orc_create_complex; + +set hive.llap.io.enabled=false; + +INSERT INTO TABLE orc_create_complex +SELECT orc_create_staging.*, src1.key FROM orc_create_staging cross join src src1 cross join orc_create_staging spam1 cross join orc_create_staging spam2; + +select count(*) from orc_create_complex; + +set hive.llap.io.enabled=true; + +SELECT distinct lst, strct FROM orc_create_complex; + +SELECT str, count(val) FROM orc_create_complex GROUP BY str; + +SELECT strct.B, count(val) FROM orc_create_complex GROUP BY strct.B; + +SELECT strct, mp, lst, str, count(val) FROM orc_create_complex GROUP BY strct, mp, lst, str; -SELECT COUNT(*) FROM orc_create_complex; --- Also, since this query is not referencing the complex fields, it should vectorize. -EXPLAIN -SELECT str FROM orc_create_complex ORDER BY str; -SELECT str FROM orc_create_complex ORDER BY str; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out index 08d49bc..f16bb16 100644 --- a/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_complex_all.q.out @@ -34,8 +34,9 @@ PREHOOK: query: CREATE TABLE orc_create_complex ( str STRING, mp MAP, lst ARRAY, - strct STRUCT -) STORED AS ORC + strct STRUCT, + val string +) STORED AS ORC tblproperties("orc.row.index.stride"="1000", "orc.stripe.size"="1000", "orc.compress.size"="10000") PREHOOK: type: CREATETABLE PREHOOK: Output: database:default PREHOOK: Output: default@orc_create_complex @@ -43,16 +44,19 @@ POSTHOOK: query: CREATE TABLE orc_create_complex ( str STRING, mp MAP, lst ARRAY, - strct STRUCT -) STORED AS ORC + strct STRUCT, + val string +) STORED AS ORC tblproperties("orc.row.index.stride"="1000", "orc.stripe.size"="1000", "orc.compress.size"="10000") POSTHOOK: type: CREATETABLE POSTHOOK: Output: database:default POSTHOOK: Output: default@orc_create_complex -PREHOOK: query: INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging +PREHOOK: query: INSERT OVERWRITE TABLE orc_create_complex +SELECT orc_create_staging.*, '0' FROM orc_create_staging PREHOOK: type: QUERY PREHOOK: Input: default@orc_create_staging PREHOOK: Output: default@orc_create_complex -POSTHOOK: query: INSERT OVERWRITE TABLE orc_create_complex SELECT * FROM orc_create_staging +POSTHOOK: query: INSERT OVERWRITE TABLE orc_create_complex +SELECT orc_create_staging.*, '0' FROM orc_create_staging POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_create_staging POSTHOOK: Output: default@orc_create_complex @@ -60,199 +64,166 @@ POSTHOOK: Lineage: orc_create_complex.lst SIMPLE [(orc_create_staging)orc_create POSTHOOK: Lineage: orc_create_complex.mp SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:mp, type:map, comment:null), ] POSTHOOK: Lineage: orc_create_complex.str SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:str, type:string, comment:null), ] POSTHOOK: Lineage: orc_create_complex.strct SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:strct, type:struct, comment:null), ] -orc_create_staging.str orc_create_staging.mp orc_create_staging.lst orc_create_staging.strct -PREHOOK: query: -- Since complex types are not supported, this query should not vectorize. -EXPLAIN -SELECT * FROM orc_create_complex +POSTHOOK: Lineage: orc_create_complex.val SIMPLE [] +orc_create_staging.str orc_create_staging.mp orc_create_staging.lst orc_create_staging.strct c1 +PREHOOK: query: SELECT * FROM orc_create_complex PREHOOK: type: QUERY -POSTHOOK: query: -- Since complex types are not supported, this query should not vectorize. -EXPLAIN -SELECT * FROM orc_create_complex +PREHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +POSTHOOK: query: SELECT * FROM orc_create_complex POSTHOOK: type: QUERY -Explain -STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-1 - Tez +POSTHOOK: Input: default@orc_create_complex #### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: orc_create_complex - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: str (type: string), mp (type: map), lst (type: array), strct (type: struct) - outputColumnNames: _col0, _col1, _col2, _col3 - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - Execution mode: llap - LLAP IO: no inputs - - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - ListSink - -PREHOOK: query: SELECT * FROM orc_create_complex +orc_create_complex.str orc_create_complex.mp orc_create_complex.lst orc_create_complex.strct orc_create_complex.val +line1 {"key13":"value13","key11":"value11","key12":"value12"} ["a","b","c"] {"a":"one","b":"two"} 0 +line2 {"key21":"value21","key22":"value22","key23":"value23"} ["d","e","f"] {"a":"three","b":"four"} 0 +line3 {"key31":"value31","key32":"value32","key33":"value33"} ["g","h","i"] {"a":"five","b":"six"} 0 +PREHOOK: query: SELECT str FROM orc_create_complex PREHOOK: type: QUERY PREHOOK: Input: default@orc_create_complex #### A masked pattern was here #### -POSTHOOK: query: SELECT * FROM orc_create_complex +POSTHOOK: query: SELECT str FROM orc_create_complex +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +str +line1 +line2 +line3 +PREHOOK: query: SELECT strct, mp, lst FROM orc_create_complex +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +POSTHOOK: query: SELECT strct, mp, lst FROM orc_create_complex POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_create_complex #### A masked pattern was here #### -orc_create_complex.str orc_create_complex.mp orc_create_complex.lst orc_create_complex.strct -line1 {"key13":"value13","key11":"value11","key12":"value12"} ["a","b","c"] {"a":"one","b":"two"} -line2 {"key21":"value21","key22":"value22","key23":"value23"} ["d","e","f"] {"a":"three","b":"four"} -line3 {"key31":"value31","key32":"value32","key33":"value33"} ["g","h","i"] {"a":"five","b":"six"} -PREHOOK: query: -- However, since this query is not referencing the complex fields, it should vectorize. -EXPLAIN -SELECT COUNT(*) FROM orc_create_complex +strct mp lst +{"a":"one","b":"two"} {"key13":"value13","key11":"value11","key12":"value12"} ["a","b","c"] +{"a":"three","b":"four"} {"key21":"value21","key22":"value22","key23":"value23"} ["d","e","f"] +{"a":"five","b":"six"} {"key31":"value31","key32":"value32","key33":"value33"} ["g","h","i"] +PREHOOK: query: SELECT lst, str FROM orc_create_complex PREHOOK: type: QUERY -POSTHOOK: query: -- However, since this query is not referencing the complex fields, it should vectorize. -EXPLAIN -SELECT COUNT(*) FROM orc_create_complex +PREHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +POSTHOOK: query: SELECT lst, str FROM orc_create_complex POSTHOOK: type: QUERY -Explain -STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-1 - Tez +POSTHOOK: Input: default@orc_create_complex #### A masked pattern was here #### - Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE) +lst str +["a","b","c"] line1 +["d","e","f"] line2 +["g","h","i"] line3 +PREHOOK: query: SELECT mp, str FROM orc_create_complex +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_create_complex #### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: orc_create_complex - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: COMPLETE - Select Operator - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: COMPLETE - Group By Operator - aggregations: count() - mode: hash - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - Reduce Output Operator - sort order: - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - value expressions: _col0 (type: bigint) - Execution mode: vectorized, llap - LLAP IO: all inputs - Reducer 2 - Execution mode: vectorized, llap - Reduce Operator Tree: - Group By Operator - aggregations: count(VALUE._col0) - mode: mergepartial - outputColumnNames: _col0 - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - File Output Operator - compressed: false - Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - ListSink - -PREHOOK: query: SELECT COUNT(*) FROM orc_create_complex +POSTHOOK: query: SELECT mp, str FROM orc_create_complex +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +mp str +{"key13":"value13","key11":"value11","key12":"value12"} line1 +{"key21":"value21","key22":"value22","key23":"value23"} line2 +{"key31":"value31","key32":"value32","key33":"value33"} line3 +PREHOOK: query: SELECT strct, str FROM orc_create_complex +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +POSTHOOK: query: SELECT strct, str FROM orc_create_complex +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +strct str +{"a":"one","b":"two"} line1 +{"a":"three","b":"four"} line2 +{"a":"five","b":"six"} line3 +PREHOOK: query: SELECT strct.B, str FROM orc_create_complex PREHOOK: type: QUERY PREHOOK: Input: default@orc_create_complex #### A masked pattern was here #### -POSTHOOK: query: SELECT COUNT(*) FROM orc_create_complex +POSTHOOK: query: SELECT strct.B, str FROM orc_create_complex +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +b str +two line1 +four line2 +six line3 +Warning: Shuffle Join MERGEJOIN[15][tables = [$hdt$_1, $hdt$_2, $hdt$_3, $hdt$_0]] in Stage 'Reducer 2' is a cross product +PREHOOK: query: INSERT INTO TABLE orc_create_complex +SELECT orc_create_staging.*, src1.key FROM orc_create_staging cross join src src1 cross join orc_create_staging spam1 cross join orc_create_staging spam2 +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_create_staging +PREHOOK: Input: default@src +PREHOOK: Output: default@orc_create_complex +POSTHOOK: query: INSERT INTO TABLE orc_create_complex +SELECT orc_create_staging.*, src1.key FROM orc_create_staging cross join src src1 cross join orc_create_staging spam1 cross join orc_create_staging spam2 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_create_staging +POSTHOOK: Input: default@src +POSTHOOK: Output: default@orc_create_complex +POSTHOOK: Lineage: orc_create_complex.lst SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:lst, type:array, comment:null), ] +POSTHOOK: Lineage: orc_create_complex.mp SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:mp, type:map, comment:null), ] +POSTHOOK: Lineage: orc_create_complex.str SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:str, type:string, comment:null), ] +POSTHOOK: Lineage: orc_create_complex.strct SIMPLE [(orc_create_staging)orc_create_staging.FieldSchema(name:strct, type:struct, comment:null), ] +POSTHOOK: Lineage: orc_create_complex.val SIMPLE [(src)src1.FieldSchema(name:key, type:string, comment:default), ] +orc_create_staging.str orc_create_staging.mp orc_create_staging.lst orc_create_staging.strct src1.key +PREHOOK: query: select count(*) from orc_create_complex +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +POSTHOOK: query: select count(*) from orc_create_complex POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_create_complex #### A masked pattern was here #### c0 -3 -PREHOOK: query: -- Also, since this query is not referencing the complex fields, it should vectorize. -EXPLAIN -SELECT str FROM orc_create_complex ORDER BY str +13503 +PREHOOK: query: SELECT distinct lst, strct FROM orc_create_complex PREHOOK: type: QUERY -POSTHOOK: query: -- Also, since this query is not referencing the complex fields, it should vectorize. -EXPLAIN -SELECT str FROM orc_create_complex ORDER BY str +PREHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +POSTHOOK: query: SELECT distinct lst, strct FROM orc_create_complex POSTHOOK: type: QUERY -Explain -STAGE DEPENDENCIES: - Stage-1 is a root stage - Stage-0 depends on stages: Stage-1 - -STAGE PLANS: - Stage: Stage-1 - Tez +POSTHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +lst strct +["a","b","c"] {"a":"one","b":"two"} +["d","e","f"] {"a":"three","b":"four"} +["g","h","i"] {"a":"five","b":"six"} +PREHOOK: query: SELECT str, count(val) FROM orc_create_complex GROUP BY str +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_create_complex #### A masked pattern was here #### - Edges: - Reducer 2 <- Map 1 (SIMPLE_EDGE) +POSTHOOK: query: SELECT str, count(val) FROM orc_create_complex GROUP BY str +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_create_complex #### A masked pattern was here #### - Vertices: - Map 1 - Map Operator Tree: - TableScan - alias: orc_create_complex - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE - Select Operator - expressions: str (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE - Reduce Output Operator - key expressions: _col0 (type: string) - sort order: + - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE - Execution mode: vectorized, llap - LLAP IO: all inputs - Reducer 2 - Execution mode: vectorized, llap - Reduce Operator Tree: - Select Operator - expressions: KEY.reducesinkkey0 (type: string) - outputColumnNames: _col0 - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE - File Output Operator - compressed: false - Statistics: Num rows: 3 Data size: 3177 Basic stats: COMPLETE Column stats: NONE - table: - input format: org.apache.hadoop.mapred.SequenceFileInputFormat - output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat - serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe - - Stage: Stage-0 - Fetch Operator - limit: -1 - Processor Tree: - ListSink - -PREHOOK: query: SELECT str FROM orc_create_complex ORDER BY str +str c1 +line1 4501 +line2 4501 +line3 4501 +PREHOOK: query: SELECT strct.B, count(val) FROM orc_create_complex GROUP BY strct.B PREHOOK: type: QUERY PREHOOK: Input: default@orc_create_complex #### A masked pattern was here #### -POSTHOOK: query: SELECT str FROM orc_create_complex ORDER BY str +POSTHOOK: query: SELECT strct.B, count(val) FROM orc_create_complex GROUP BY strct.B POSTHOOK: type: QUERY POSTHOOK: Input: default@orc_create_complex #### A masked pattern was here #### -str -line1 -line2 -line3 +strct.b _c1 +four 4501 +six 4501 +two 4501 +PREHOOK: query: SELECT strct, mp, lst, str, count(val) FROM orc_create_complex GROUP BY strct, mp, lst, str +PREHOOK: type: QUERY +PREHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +POSTHOOK: query: SELECT strct, mp, lst, str, count(val) FROM orc_create_complex GROUP BY strct, mp, lst, str +POSTHOOK: type: QUERY +POSTHOOK: Input: default@orc_create_complex +#### A masked pattern was here #### +strct mp lst str c4 +{"a":"one","b":"two"} {"key11":"value11","key12":"value12","key13":"value13"} ["a","b","c"] line1 4501 +{"a":"three","b":"four"} {"key21":"value21","key22":"value22","key23":"value23"} ["d","e","f"] line2 4501 +{"a":"five","b":"six"} {"key31":"value31","key32":"value32","key33":"value33"} ["g","h","i"] line3 4501 http://git-wip-us.apache.org/repos/asf/hive/blob/6efa869f/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out index 97d5642..133b8ef 100644 --- a/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_complex_join.q.out @@ -90,7 +90,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 190 Basic stats: COMPLETE Column stats: NONE value expressions: _col1 (type: map) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Stage: Stage-0 Fetch Operator @@ -211,7 +211,7 @@ STAGE PLANS: Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: NONE value expressions: a (type: array) Execution mode: llap - LLAP IO: no inputs + LLAP IO: all inputs Stage: Stage-0 Fetch Operator