hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject svn commit: r1670992 [1/5] - in /hive/branches/llap: llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/ ql/src/java/o...
Date Thu, 02 Apr 2015 23:32:01 GMT
Author: prasanthj
Date: Thu Apr  2 23:32:01 2015
New Revision: 1670992

URL: http://svn.apache.org/r1670992
Log:
HIVE-10158: LLAP: Varchar columns are not handled in encoded readers (Prasanth Jayachandran)

Added:
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedRecordReaderImplFactory.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImplFactory.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SettableUncompressedStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/StreamUtils.java
Removed:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/
Modified:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/branches/llap/ql/src/test/queries/clientpositive/llap_partitioned.q
    hive/branches/llap/ql/src/test/results/clientpositive/llap_partitioned.q.out

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java?rev=1670992&r1=1670991&r2=1670992&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
Thu Apr  2 23:32:01 2015
@@ -22,7 +22,6 @@ package org.apache.hadoop.hive.llap.io.a
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Random;
 
 import org.apache.hadoop.hive.llap.Consumer;
 import org.apache.hadoop.hive.llap.ConsumerFeedback;
@@ -150,14 +149,14 @@ public class LlapInputFormat
         throw new IOException(e);
       }
       if (cvb == null) return false;
-      int[] columnMap = rbCtx.getIncludedColumnIndexes();
-      if (columnMap.length != cvb.cols.length) {
-        throw new RuntimeException("Unexpected number of columns, VRB has " + columnMap.length
+      if (columnIds.size() != cvb.cols.length) {
+        throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size()
             + " included, but the reader returned " + cvb.cols.length);
       }
       // VRB was created from VrbCtx, so we already have pre-allocated column vectors
       for (int i = 0; i < cvb.cols.length; ++i) {
-        value.cols[columnMap[i]] = cvb.cols[i]; // TODO: reuse CV objects that are replaced
+        int columnId = columnIds.get(i);
+        value.cols[columnId] = cvb.cols[i]; // TODO: reuse CV objects that are replaced
       }
       value.selectedInUse = false;
       value.size = cvb.size;

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java?rev=1670992&r1=1670991&r2=1670992&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java
Thu Apr  2 23:32:01 2015
@@ -24,32 +24,20 @@ import org.apache.hadoop.hive.llap.count
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BinaryStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.BooleanStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ByteStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.CharacterStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DateStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DecimalStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DoubleStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.FloatStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.IntStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.LongStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ShortStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.StringStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.TimestampStreamReader;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.EncodedRecordReaderImplFactory;
 import org.apache.hadoop.hive.ql.io.orc.OrcProto;
 import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImplFactory;
 import org.apache.hadoop.hive.ql.io.orc.WriterImpl;
 
 public class OrcEncodedDataConsumer extends EncodedDataConsumer<OrcBatchKey> {
-  private RecordReaderImpl.TreeReader[] columnReaders;
+  private RecordReaderImplFactory.TreeReader[] columnReaders;
   private int previousStripeIndex = -1;
   private OrcFileMetadata fileMetadata; // We assume one request is only for one file.
   private CompressionCodec codec;
@@ -95,10 +83,10 @@ public class OrcEncodedDataConsumer exte
       int batchSize = VectorizedRowBatch.DEFAULT_SIZE;
       int numCols = batch.columnIxs.length;
       if (columnReaders == null || !sameStripe) {
-        this.columnReaders = createTreeReaders(numCols, batch, codec, fileMetadata, stripeMetadata);
+        this.columnReaders = EncodedRecordReaderImplFactory.createEncodedTreeReader(numCols,
+            fileMetadata.getTypes(), stripeMetadata.getEncodings(), batch, codec, skipCorrupt);
       } else {
-        repositionInStreams(this.columnReaders, batch, sameStripe, numCols, fileMetadata,
-            stripeMetadata);
+        repositionInStreams(this.columnReaders, batch, sameStripe, numCols, stripeMetadata);
       }
       previousStripeIndex = currentStripeIndex;
 
@@ -128,304 +116,17 @@ public class OrcEncodedDataConsumer exte
     }
   }
 
-  RecordReaderImpl.TreeReader[] createTreeReaders(int numCols,
-      EncodedColumnBatch<OrcBatchKey> batch,
-      CompressionCodec codec,
-      OrcFileMetadata fileMetadata,
-      OrcStripeMetadata stripeMetadata) throws IOException {
-    long file = batch.batchKey.file;
-    RecordReaderImpl.TreeReader[] treeReaders = new RecordReaderImpl.TreeReader[numCols];
-
-    for (int i = 0; i < numCols; i++) {
-      int columnIndex = batch.columnIxs[i];
-      int rowGroupIndex = batch.batchKey.rgIx;
-      EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
-      OrcProto.Type columnType = fileMetadata.getTypes().get(columnIndex);
-
-      // EncodedColumnBatch is already decompressed, we don't really need to pass codec.
-      // But we need to know if the original data is compressed or not. This is used to skip
-      // positions in row index properly. If the file is originally compressed,
-      // then 1st position (compressed offset) in row index should be skipped to get
-      // uncompressed offset, else 1st position should not be skipped.
-      // TODO: there should be a better way to do this, code just needs to be modified
-      OrcProto.ColumnEncoding columnEncoding = stripeMetadata.getEncodings().get(columnIndex);
-      OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex];
-      OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex);
-
-      // stream buffers are arranged in enum order of stream kind
-      EncodedColumnBatch.StreamBuffer present = null;
-      EncodedColumnBatch.StreamBuffer data = null;
-      EncodedColumnBatch.StreamBuffer dictionary = null;
-      EncodedColumnBatch.StreamBuffer lengths = null;
-      EncodedColumnBatch.StreamBuffer secondary = null;
-      for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
-        switch(streamBuffer.streamKind) {
-          case 0:
-            // PRESENT stream
-            present = streamBuffer;
-            break;
-          case 1:
-            // DATA stream
-            data = streamBuffer;
-            break;
-          case 2:
-            // LENGTH stream
-            lengths = streamBuffer;
-            break;
-          case 3:
-            // DICTIONARY_DATA stream
-            dictionary = streamBuffer;
-            break;
-          case 5:
-            // SECONDARY stream
-            secondary = streamBuffer;
-            break;
-          default:
-            throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
-        }
-      }
-
-      switch (columnType.getKind()) {
-        case BINARY:
-          treeReaders[i] = BinaryStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setLengthStream(lengths)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case BOOLEAN:
-          treeReaders[i] = BooleanStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .build();
-          break;
-        case BYTE:
-          treeReaders[i] = ByteStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .build();
-          break;
-        case SHORT:
-          treeReaders[i] = ShortStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case INT:
-          treeReaders[i] = IntStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case LONG:
-          treeReaders[i] = LongStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .skipCorrupt(skipCorrupt)
-              .build();
-          break;
-        case FLOAT:
-          treeReaders[i] = FloatStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .build();
-          break;
-        case DOUBLE:
-          treeReaders[i] = DoubleStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .build();
-          break;
-        case CHAR:
-        case VARCHAR:
-          treeReaders[i] = CharacterStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setMaxLength(columnType.getMaximumLength())
-              .setCharacterType(columnType)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setLengthStream(lengths)
-              .setDictionaryStream(dictionary)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case STRING:
-          treeReaders[i] = StringStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setLengthStream(lengths)
-              .setDictionaryStream(dictionary)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case DECIMAL:
-          treeReaders[i] = DecimalStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPrecision(columnType.getPrecision())
-              .setScale(columnType.getScale())
-              .setPresentStream(present)
-              .setValueStream(data)
-              .setScaleStream(secondary)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        case TIMESTAMP:
-          treeReaders[i] = TimestampStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setSecondsStream(data)
-              .setNanosStream(secondary)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .skipCorrupt(skipCorrupt)
-              .build();
-          break;
-        case DATE:
-          treeReaders[i] = DateStreamReader.builder()
-              .setFileId(file)
-              .setColumnIndex(columnIndex)
-              .setPresentStream(present)
-              .setDataStream(data)
-              .setCompressionCodec(codec)
-              .setColumnEncoding(columnEncoding)
-              .build();
-          break;
-        default:
-          throw new UnsupportedOperationException("Data type not supported yet! " + columnType);
-      }
-      treeReaders[i].seek(StreamUtils.getPositionProvider(rowIndexEntry));
-    }
-    return treeReaders;
-  }
-
-  private void repositionInStreams(RecordReaderImpl.TreeReader[] columnReaders,
+  private void repositionInStreams(RecordReaderImplFactory.TreeReader[] columnReaders,
       EncodedColumnBatch<OrcBatchKey> batch, boolean sameStripe, int numCols,
-      OrcFileMetadata fileMetadata, OrcStripeMetadata stripeMetadata) throws IOException
{
+      OrcStripeMetadata stripeMetadata) throws IOException {
     for (int i = 0; i < numCols; i++) {
       int columnIndex = batch.columnIxs[i];
       int rowGroupIndex = batch.batchKey.rgIx;
       EncodedColumnBatch.StreamBuffer[] streamBuffers = batch.columnData[i];
-      OrcProto.Type columnType = fileMetadata.getTypes().get(columnIndex);
       OrcProto.RowIndex rowIndex = stripeMetadata.getRowIndexes()[columnIndex];
       OrcProto.RowIndexEntry rowIndexEntry = rowIndex.getEntry(rowGroupIndex);
-
-      // stream buffers are arranged in enum order of stream kind
-      EncodedColumnBatch.StreamBuffer present = null;
-      EncodedColumnBatch.StreamBuffer data = null;
-      EncodedColumnBatch.StreamBuffer dictionary = null;
-      EncodedColumnBatch.StreamBuffer lengths = null;
-      EncodedColumnBatch.StreamBuffer secondary = null;
-      for (EncodedColumnBatch.StreamBuffer streamBuffer : streamBuffers) {
-        switch(streamBuffer.streamKind) {
-          case 0:
-            // PRESENT stream
-            present = streamBuffer;
-            break;
-          case 1:
-            // DATA stream
-            data = streamBuffer;
-            break;
-          case 2:
-            // LENGTH stream
-            lengths = streamBuffer;
-            break;
-          case 3:
-            // DICTIONARY_DATA stream
-            dictionary = streamBuffer;
-            break;
-          case 5:
-            // SECONDARY stream
-            secondary = streamBuffer;
-            break;
-          default:
-            throw new IOException("Unexpected stream kind: " + streamBuffer.streamKind);
-        }
-      }
-
-      switch (columnType.getKind()) {
-        case BINARY:
-          ((BinaryStreamReader)columnReaders[i]).setBuffers(present, data, lengths);
-          break;
-        case BOOLEAN:
-          ((BooleanStreamReader)columnReaders[i]).setBuffers(present, data);
-          break;
-        case BYTE:
-          ((ByteStreamReader)columnReaders[i]).setBuffers(present, data);
-          break;
-        case SHORT:
-          ((ShortStreamReader)columnReaders[i]).setBuffers(present, data);
-          break;
-        case INT:
-          ((IntStreamReader)columnReaders[i]).setBuffers(present, data);
-          break;
-        case LONG:
-          ((LongStreamReader)columnReaders[i]).setBuffers(present, data);
-          break;
-        case FLOAT:
-          ((FloatStreamReader)columnReaders[i]).setBuffers(present, data);
-          break;
-        case DOUBLE:
-          ((DoubleStreamReader)columnReaders[i]).setBuffers(present, data);
-          break;
-        case CHAR:
-        case VARCHAR:
-          ((CharacterStreamReader)columnReaders[i]).setBuffers(present, data, lengths, dictionary,
-              sameStripe);
-          break;
-        case STRING:
-          ((StringStreamReader)columnReaders[i]).setBuffers(present, data, lengths, dictionary,
-              sameStripe);
-          break;
-        case DECIMAL:
-          ((DecimalStreamReader)columnReaders[i]).setBuffers(present, data, secondary);
-          break;
-        case TIMESTAMP:
-          ((TimestampStreamReader)columnReaders[i]).setBuffers(present, data, secondary);
-          break;
-        case DATE:
-          ((DateStreamReader)columnReaders[i]).setBuffers(present, data);
-          break;
-        default:
-          throw new UnsupportedOperationException("Data type not supported yet! " + columnType);
-      }
-
-      columnReaders[i].seek(StreamUtils.getPositionProvider(rowIndexEntry));
+      columnReaders[i].setBuffers(streamBuffers, sameStripe);
+      columnReaders[i].seek(new RecordReaderImpl.PositionProviderImpl(rowIndexEntry));
     }
   }
 

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1670992&r1=1670991&r2=1670992&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
(original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
Thu Apr  2 23:32:01 2015
@@ -673,35 +673,4 @@ public class VectorizedRowBatchCtx {
       throw new Error("Cannot allocate vector column for " + type);
     }
   }
-
-  public VectorColumnAssign[] buildObjectAssigners(VectorizedRowBatch outputBatch)
-        throws HiveException {
-    List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
-    assert outputBatch.numCols == fieldRefs.size();
-    VectorColumnAssign[] assigners = new VectorColumnAssign[fieldRefs.size()];
-    for(int i = 0; i < assigners.length; ++i) {
-        StructField fieldRef = fieldRefs.get(i);
-        ObjectInspector fieldOI = fieldRef.getFieldObjectInspector();
-        assigners[i] = VectorColumnAssignFactory.buildObjectAssign(
-                outputBatch, i, fieldOI);
-    }
-    return assigners;
-  }
-
-  public int[] getIncludedColumnIndexes() {
-    int colCount = (colsToInclude == null)
-        ? rowOI.getAllStructFieldRefs().size() : colsToInclude.size();
-    int[] result = new int[colCount];
-    if (colsToInclude == null) {
-      for (int i = 0; i < result.length; ++i) {
-        result[i] = i;
-      }
-    } else {
-      int i = -1;
-      for (int colIx : colsToInclude) {
-        result[++i] = colIx;
-      }
-    }
-    return result;
-  }
 }



Mime
View raw message