hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prasan...@apache.org
Subject svn commit: r1659113 - in /hive/branches/llap: llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/ llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/ ll...
Date Thu, 12 Feb 2015 01:04:55 GMT
Author: prasanthj
Date: Thu Feb 12 01:04:55 2015
New Revision: 1659113

URL: http://svn.apache.org/r1659113
Log:
HIVE-9419p3: LLAP: ORC decoding of row-groups (Partial patch3)

Added:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/StreamUtils.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DoubleStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/FloatStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java
Removed:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/readers/
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/streams/
Modified:
    hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java

Modified: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java?rev=1659113&r1=1659112&r2=1659113&view=diff
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
(original)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java
Thu Feb 12 01:04:55 2015
@@ -28,11 +28,11 @@ import org.apache.hadoop.hive.llap.Consu
 import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
 import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
 import org.apache.hadoop.hive.llap.io.api.orc.OrcBatchKey;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.DoubleStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.FloatStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.IntStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.LongStreamReader;
-import org.apache.hadoop.hive.llap.io.decode.orc.streams.ShortStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.DoubleStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.FloatStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.IntStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.LongStreamReader;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.readers.ShortStreamReader;
 import org.apache.hadoop.hive.llap.io.encoded.EncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.encoded.OrcEncodedDataProducer;
 import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/StreamUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/StreamUtils.java?rev=1659113&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/StreamUtils.java
(added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/StreamUtils.java
Thu Feb 12 01:04:55 2015
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class StreamUtils {
+
+  public static InStream createInStream(String streamName, String fileName, CompressionCodec
codec,
+      int bufferSize, EncodedColumnBatch.StreamBuffer streamBuffer) throws IOException {
+    int numBuffers = streamBuffer.cacheBuffers.size();
+    List<DiskRange> input = new ArrayList<>(numBuffers);
+    int totalLength = 0;
+    for (int i = 0; i < numBuffers; i++) {
+      LlapMemoryBuffer lmb = streamBuffer.cacheBuffers.get(i);
+      input.add(new RecordReaderImpl.CacheChunk(lmb, 0, lmb.byteBuffer.limit()));
+      totalLength += lmb.byteBuffer.limit();
+    }
+    return InStream.create(fileName, streamName, input, totalLength, codec, bufferSize, null);
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DoubleStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DoubleStreamReader.java?rev=1659113&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DoubleStreamReader.java
(added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/DoubleStreamReader.java
Thu Feb 12 01:04:55 2015
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class DoubleStreamReader extends RecordReaderImpl.DoubleTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private DoubleStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data);
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream)
{
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream)
{
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public DoubleStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new DoubleStreamReader(columnIndex, present, data, compressionCodec != null,
rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/FloatStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/FloatStreamReader.java?rev=1659113&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/FloatStreamReader.java
(added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/FloatStreamReader.java
Thu Feb 12 01:04:55 2015
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class FloatStreamReader extends RecordReaderImpl.FloatTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private FloatStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream)
{
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream)
{
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public FloatStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new FloatStreamReader(columnIndex, present, data, compressionCodec != null,
rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java?rev=1659113&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java
(added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/IntStreamReader.java
Thu Feb 12 01:04:55 2015
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class IntStreamReader extends RecordReaderImpl.IntTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private IntStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.ColumnEncoding.Kind kind,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, kind);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding.Kind columnEncodingKind;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream)
{
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream)
{
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncodingKind(OrcProto.ColumnEncoding.Kind kind) {
+      this.columnEncodingKind = kind;
+      return this;
+    }
+
+    public IntStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new IntStreamReader(columnIndex, present, data,
+          compressionCodec != null, columnEncodingKind, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java?rev=1659113&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java
(added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/LongStreamReader.java
Thu Feb 12 01:04:55 2015
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class LongStreamReader extends RecordReaderImpl.LongTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private LongStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.ColumnEncoding.Kind kind, boolean skipCorrupt,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, kind, skipCorrupt);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding.Kind columnEncodingKind;
+    private boolean skipCorrupt;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream)
{
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream)
{
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncodingKind(OrcProto.ColumnEncoding.Kind kind) {
+      this.columnEncodingKind = kind;
+      return this;
+    }
+
+    public StreamReaderBuilder skipCorrupt(boolean skipCorrupt) {
+      this.skipCorrupt = skipCorrupt;
+      return this;
+    }
+
+    public LongStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new LongStreamReader(columnIndex, present, data,
+          compressionCodec != null, columnEncodingKind, skipCorrupt, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Added: hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java?rev=1659113&view=auto
==============================================================================
--- hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java
(added)
+++ hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/orc/stream/readers/ShortStreamReader.java
Thu Feb 12 01:04:55 2015
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.io.decode.orc.stream.readers;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
+import org.apache.hadoop.hive.llap.io.decode.orc.stream.StreamUtils;
+import org.apache.hadoop.hive.ql.io.orc.CompressionCodec;
+import org.apache.hadoop.hive.ql.io.orc.InStream;
+import org.apache.hadoop.hive.ql.io.orc.OrcProto;
+import org.apache.hadoop.hive.ql.io.orc.PositionProvider;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
+
+/**
+ *
+ */
+public class ShortStreamReader extends RecordReaderImpl.ShortTreeReader {
+  private boolean isFileCompressed;
+  private OrcProto.RowIndexEntry rowIndex;
+
+  private ShortStreamReader(int columnId, InStream present,
+      InStream data, boolean isFileCompressed,
+      OrcProto.ColumnEncoding.Kind kind,
+      OrcProto.RowIndexEntry rowIndex) throws IOException {
+    super(columnId, present, data, kind);
+    this.isFileCompressed = isFileCompressed;
+    this.rowIndex = rowIndex;
+
+    // position the readers based on the specified row index
+    PositionProvider positionProvider = new RecordReaderImpl.PositionProviderImpl(rowIndex);
+    seek(positionProvider);
+  }
+
+  public void seek(PositionProvider positionProvider) throws IOException {
+    super.seek(positionProvider);
+  }
+
+  public static class StreamReaderBuilder {
+    private String fileName;
+    private int columnIndex;
+    private EncodedColumnBatch.StreamBuffer presentStream;
+    private EncodedColumnBatch.StreamBuffer dataStream;
+    private CompressionCodec compressionCodec;
+    private int bufferSize;
+    private OrcProto.RowIndexEntry rowIndex;
+    private OrcProto.ColumnEncoding.Kind columnEncodingKind;
+
+    public StreamReaderBuilder setFileName(String fileName) {
+      this.fileName = fileName;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnIndex(int columnIndex) {
+      this.columnIndex = columnIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setPresentStream(EncodedColumnBatch.StreamBuffer presentStream)
{
+      this.presentStream = presentStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setDataStream(EncodedColumnBatch.StreamBuffer dataStream)
{
+      this.dataStream = dataStream;
+      return this;
+    }
+
+    public StreamReaderBuilder setCompressionCodec(CompressionCodec compressionCodec) {
+      this.compressionCodec = compressionCodec;
+      return this;
+    }
+
+    public StreamReaderBuilder setBufferSize(int bufferSize) {
+      this.bufferSize = bufferSize;
+      return this;
+    }
+
+    public StreamReaderBuilder setRowIndex(OrcProto.RowIndexEntry rowIndex) {
+      this.rowIndex = rowIndex;
+      return this;
+    }
+
+    public StreamReaderBuilder setColumnEncodingKind(OrcProto.ColumnEncoding.Kind kind) {
+      this.columnEncodingKind = kind;
+      return this;
+    }
+
+    public ShortStreamReader build() throws IOException {
+      InStream present = null;
+      if (presentStream != null) {
+        present = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.PRESENT.name(), fileName, null, bufferSize,
+                presentStream);
+      }
+
+      InStream data = null;
+      if (dataStream != null) {
+        data = StreamUtils
+            .createInStream(OrcProto.Stream.Kind.DATA.name(), fileName, null, bufferSize,
+                dataStream);
+      }
+
+      return new ShortStreamReader(columnIndex, present, data,
+          compressionCodec != null, columnEncodingKind, rowIndex);
+    }
+  }
+
+  public static StreamReaderBuilder builder() {
+    return new StreamReaderBuilder();
+  }
+}

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1659113&r1=1659112&r2=1659113&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Thu Feb
12 01:04:55 2015
@@ -614,12 +614,13 @@ public abstract class InStream extends I
 
     // 4. Now decompress (or copy) the data into cache buffers.
     for (ProcCacheChunk chunk : toDecompress) {
+      int startPos = chunk.buffer.byteBuffer.position();
       if (chunk.isCompressed) {
         decompressDirectBuffer(codec, chunk.originalData, chunk.buffer.byteBuffer);
       } else {
         chunk.buffer.byteBuffer.put(chunk.originalData); // Copy uncompressed data to cache.
       }
-      chunk.buffer.byteBuffer.position(0);
+      chunk.buffer.byteBuffer.position(startPos);
       chunk.originalData = null;
     }
 



Mime
View raw message