drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [01/10] drill git commit: DRILL-4800: Use a buffering input stream in the Parquet reader
Date Sat, 05 Nov 2016 00:11:28 GMT
Repository: drill
Updated Branches:
  refs/heads/master 190d5d46d -> 9411b26ec


DRILL-4800: Use a buffering input stream in the Parquet reader


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/fe2334ee
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/fe2334ee
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/fe2334ee

Branch: refs/heads/master
Commit: fe2334ee0234aa37637f17d7833e734333b73892
Parents: 190d5d4
Author: Parth Chandra <parthc@apache.org>
Authored: Fri Jun 10 14:56:41 2016 -0700
Committer: Parth Chandra <parthc@apache.org>
Committed: Wed Nov 2 17:23:27 2016 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   4 +
 .../apache/drill/exec/ops/OperatorContext.java  |   6 +-
 .../drill/exec/ops/OperatorContextImpl.java     |   5 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../store/parquet/columnreaders/PageReader.java | 122 +++--
 .../columnreaders/ParquetRecordReader.java      |  16 +-
 .../BufferedDirectBufInputStream.java           | 460 +++++++++++++++++++
 .../util/filereader/DirectBufInputStream.java   | 166 +++++++
 8 files changed, 727 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/fe2334ee/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 21015bb..ba6b084 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -147,6 +147,10 @@ public interface ExecConstants {
   String PARQUET_READER_INT96_AS_TIMESTAMP = "store.parquet.reader.int96_as_timestamp";
   OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP,
false);
 
+  // Use a buffering reader for parquet page reader
+  String PARQUET_PAGEREADER_USE_BUFFERED_READ = "store.parquet.reader.pagereader.bufferedread";
+  OptionValidator PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR = new  BooleanValidator(PARQUET_PAGEREADER_USE_BUFFERED_READ,
true);
+
   OptionValidator COMPILE_SCALAR_REPLACEMENT = new BooleanValidator("exec.compile.scalar_replacement",
false);
 
   String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";

http://git-wip-us.apache.org/repos/asf/drill/blob/fe2334ee/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 2c169a4..33fa288 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -23,6 +23,8 @@ import io.netty.buffer.DrillBuf;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -43,6 +45,8 @@ public abstract class OperatorContext {
 
   public abstract OperatorStats getStats();
 
+  public abstract ExecutorService getExecutor();
+
   public abstract ExecutionControls getExecutionControls();
 
   public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
@@ -72,4 +76,4 @@ public abstract class OperatorContext {
     return i;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fe2334ee/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 8217afd..85f0ccb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -95,6 +95,11 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable
{
     return manager.getManagedBuffer(size);
   }
 
+  // Allow and operator to use the thread pool
+  public ExecutorService getExecutor() {
+    return executor;
+  }
+
   public ExecutionControls getExecutionControls() {
     return executionControls;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/fe2334ee/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index f272c9d..8b67fdb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -99,6 +99,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
       ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR,
       ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR,
       ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
+      ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
       ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
       ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR,
       ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/fe2334ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index e7b4b6e..078e4ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -17,18 +17,17 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import static org.apache.parquet.column.Encoding.valueOf;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.store.parquet.ColumnDataReader;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
 import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.drill.exec.util.filereader.DirectBufInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,6 +36,7 @@ import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.ValuesType;
 import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.dictionary.DictionaryValuesReader;
 import org.apache.parquet.format.PageHeader;
@@ -48,22 +48,26 @@ import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.PrimitiveType;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
+import static org.apache.parquet.column.Encoding.valueOf;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics;
 
 // class to keep track of the read position of variable length columns
 final class PageReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PageReader.class);
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(
+      org.apache.drill.exec.store.parquet.columnreaders.PageReader.class);
 
   public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter;
 
-  private final ColumnReader<?> parentColumnReader;
-  private final ColumnDataReader dataReader;
-
-  // buffer to store bytes of current page
+  private final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentColumnReader;
+  //private final ColumnDataReader dataReader;
+  private final DirectBufInputStream dataReader;
+  //der; buffer to store bytes of current page
   DrillBuf pageData;
 
   // for variable length data we need to keep track of our current position in the page data
@@ -108,8 +112,8 @@ final class PageReader {
 
   private final ParquetReaderStats stats;
 
-  PageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path, ColumnChunkMetaData
columnChunkMetaData)
-    throws ExecutionSetupException{
+  PageReader(org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentStatus,
FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData)
+    throws ExecutionSetupException {
     this.parentColumnReader = parentStatus;
     allocatedDictionaryBuffers = new ArrayList<ByteBuf>();
     codecFactory = parentColumnReader.parentReader.getCodecFactory();
@@ -117,8 +121,22 @@ final class PageReader {
     long start = columnChunkMetaData.getFirstDataPageOffset();
     try {
       inputStream  = fs.open(path);
-      this.dataReader = new ColumnDataReader(inputStream, start, columnChunkMetaData.getTotalSize());
-      loadDictionaryIfExists(parentStatus, columnChunkMetaData, inputStream);
+      BufferAllocator allocator =  parentColumnReader.parentReader.getOperatorContext().getAllocator();
+      //TODO: make read batch size configurable
+      columnChunkMetaData.getTotalUncompressedSize();
+      boolean useBufferedReader  = parentColumnReader.parentReader.getFragmentContext().getOptions()
+          .getOption(ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ).bool_val;
+      if (useBufferedReader) {
+        this.dataReader = new BufferedDirectBufInputStream(inputStream, allocator, path.getName(),
+            columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), 8 *
1024 * 1024,
+            true);
+      } else {
+        this.dataReader = new DirectBufInputStream(inputStream, allocator, path.getName(),
+            columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), true);
+      }
+      dataReader.init();
+
+      loadDictionaryIfExists(parentStatus, columnChunkMetaData, dataReader);
 
     } catch (IOException e) {
       throw new ExecutionSetupException("Error opening or reading metadata for parquet file
at location: "
@@ -127,16 +145,16 @@ final class PageReader {
 
   }
 
-  private void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
-      final ColumnChunkMetaData columnChunkMetaData, final FSDataInputStream f) throws IOException
{
+  private void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?>
parentStatus,
+      final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws
IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
     if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
-      f.seek(columnChunkMetaData.getDictionaryPageOffset());
-      long start=f.getPos();
+      dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos());
+      long start=dataReader.getPos();
       timer.start();
       final PageHeader pageHeader = Util.readPageHeader(f);
       long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
-      long pageHeaderBytes=f.getPos()-start;
+      long pageHeaderBytes=dataReader.getPos()-start;
       this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
       assert pageHeader.type == PageType.DICTIONARY_PAGE;
       readDictionaryPage(pageHeader, parentStatus);
@@ -148,8 +166,8 @@ final class PageReader {
     int compressedSize = pageHeader.getCompressed_page_size();
     int uncompressedSize = pageHeader.getUncompressed_page_size();
 
-    final DrillBuf dictionaryData = allocateDictionaryBuffer(uncompressedSize);
-    readPage(pageHeader, compressedSize, uncompressedSize, dictionaryData);
+    final DrillBuf dictionaryData = readPage(pageHeader, compressedSize, uncompressedSize);
+    allocatedDictionaryBuffers.add(dictionaryData);
 
     DictionaryPage page = new DictionaryPage(
         asBytesInput(dictionaryData, 0, uncompressedSize),
@@ -160,34 +178,41 @@ final class PageReader {
     this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
   }
 
-  public void readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize, DrillBuf
dest) throws IOException {
+  public DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize)
throws IOException {
+    DrillBuf pageDataBuf = null;
     Stopwatch timer = Stopwatch.createUnstarted();
     long timeToRead;
-    long start=inputStream.getPos();
+    long start=dataReader.getPos();
     if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED)
{
       timer.start();
-      dataReader.loadPage(dest, compressedSize);
+      pageDataBuf = dataReader.getNext(compressedSize);
       timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
       this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize);
     } else {
-      final DrillBuf compressedData = allocateTemporaryBuffer(compressedSize);
+      DrillBuf compressedData = null;
+      pageDataBuf=allocateTemporaryBuffer(uncompressedSize);
+
       try {
       timer.start();
-      dataReader.loadPage(compressedData, compressedSize);
+      compressedData = dataReader.getNext(compressedSize);
+       // dataReader.loadPage(compressedData, compressedSize);
       timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
       timer.reset();
       this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize);
-      start = inputStream.getPos();
+      start=dataReader.getPos();
       timer.start();
       codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData
           .getCodec()).decompress(compressedData.nioBuffer(0, compressedSize), compressedSize,
-          dest.nioBuffer(0, uncompressedSize), uncompressedSize);
+          pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize);
         timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
         this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize);
       } finally {
-        compressedData.release();
+        if(compressedData != null) {
+          compressedData.release();
+        }
       }
     }
+    return pageDataBuf;
   }
 
   public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) throws IOException
{
@@ -198,7 +223,7 @@ final class PageReader {
    * Grab the next page.
    *
    * @return - if another page was present
-   * @throws java.io.IOException
+   * @throws IOException
    */
   public boolean next() throws IOException {
     Stopwatch timer = Stopwatch.createUnstarted();
@@ -217,11 +242,12 @@ final class PageReader {
     // TODO - figure out if we need multiple dictionary pages, I believe it may be limited
to one
     // I think we are clobbering parts of the dictionary if there can be multiple pages of
dictionary
     do {
-      long start=inputStream.getPos();
+      long start=dataReader.getPos();
       timer.start();
-      pageHeader = dataReader.readPageHeader();
+      pageHeader = Util.readPageHeader(dataReader);
       long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS);
-      this.updateStats(pageHeader, "Page Header Read", start, timeToRead, 0,0);
+      long pageHeaderBytes=dataReader.getPos()-start;
+      this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes);
       logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}","Page Header Read","",
           this.parentColumnReader.parentReader.hadoopPath,
           this.parentColumnReader.columnDescriptor.toString(), start, 0, 0, timeToRead);
@@ -233,12 +259,16 @@ final class PageReader {
 
     //TODO: Handle buffer allocation exception
 
-    allocatePageData(pageHeader.getUncompressed_page_size());
+    //allocatePageData(pageHeader.getUncompressed_page_size());
     int compressedSize = pageHeader.getCompressed_page_size();
     int uncompressedSize = pageHeader.getUncompressed_page_size();
-    readPage(pageHeader, compressedSize, uncompressedSize, pageData);
+    pageData = readPage(pageHeader, compressedSize, uncompressedSize);
 
     currentPageCount = pageHeader.data_page_header.num_values;
+    final int uncompressedPageSize = pageHeader.uncompressed_page_size;
+    final Statistics<?> stats = fromParquetStatistics(pageHeader.data_page_header.getStatistics(),
parentColumnReader
+        .getColumnDescriptor().getType());
+
 
     final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding);
 
@@ -370,7 +400,11 @@ final class PageReader {
   }
 
   public void clear(){
-    this.dataReader.clear();
+    try {
+      this.dataReader.close();
+    } catch (IOException e) {
+      //TODO: Throw UserException
+    }
     // Free all memory, including fixed length types. (Data is being copied for all types
not just var length types)
     //if(!this.parentColumnReader.isFixedLength) {
     clearBuffers();

http://git-wip-us.apache.org/repos/asf/drill/blob/fe2334ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index f095a8a..924887e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -118,12 +118,12 @@ public class ParquetRecordReader extends AbstractRecordReader {
   public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
 
   public ParquetRecordReader(FragmentContext fragmentContext,
-                             String path,
-                             int rowGroupIndex,
+      String path,
+      int rowGroupIndex,
                              long numRecordsToRead,
-                             FileSystem fs,
-                             CodecFactory codecFactory,
-                             ParquetMetadata footer,
+      FileSystem fs,
+      CodecFactory codecFactory,
+      ParquetMetadata footer,
                              List<SchemaPath> columns,
                              ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
                              throws ExecutionSetupException {
@@ -316,11 +316,11 @@ public class ParquetRecordReader extends AbstractRecordReader {
       columnsToScan++;
       int dataTypeLength = getDataTypeLength(column, se);
       if (dataTypeLength == -1) {
-        allFieldsFixedLength = false;
-      } else {
+          allFieldsFixedLength = false;
+        } else {
         bitWidthAllFixedFields += dataTypeLength;
+        }
       }
-    }
 //    rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
 
     if (columnsToScan != 0  && allFieldsFixedLength) {

http://git-wip-us.apache.org/repos/asf/drill/blob/fe2334ee/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
new file mode 100644
index 0000000..6aa968a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.filereader;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.Footer;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * <code>BufferedDirectBufInputStream</code>  reads from the
+ * underlying <code>InputStream</code> in blocks of data, into an
+ * internal buffer. The internal buffer is a direct memory backed
+ * buffer. The implementation is similar to the <code>BufferedInputStream</code>
+ * class except that the internal buffer is a Drillbuf and
+ * not a byte array. The mark and reset methods of the underlying
+ * <code>InputStream</code>are not supported.
+ */
+public class BufferedDirectBufInputStream extends DirectBufInputStream implements Closeable
{
+
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class);
+
+  private static int defaultBufferSize = 8192 * 1024; // 8 MiB
+  private static int defaultTempBufferSize = 8192; // 8 KiB
+
+  /**
+   * The internal buffer to keep data read from the underlying inputStream.
+   * <code>internalBuffer[0]</code>  through <code>internalBuffer[count-1]
</code>
+   * contains data read from the underlying  input stream.
+   */
+  protected volatile DrillBuf internalBuffer; // the internal buffer
+
+  /**
+   * The number of valid bytes in <code>internalBuffer</code>.
+   * <code> count </code> is always in the range <code>[0,internalBuffer.capacity]</code>
+   * <code>internalBuffer[count-1]</code> is the last valid byte in the buffer.
+   */
+  protected int count;
+
+  /**
+   * The current read position in the buffer; the index of the next
+   * character to be read from the <code>internalBuffer</code> array.
+   * <p>
+   * This value is always in the range <code>[0,count]</code>.
+   * If <code>curPosInBuffer</code> is equal to <code>count></code>
then we have read
+   * all the buffered data and the next read (or skip) will require more data to be read
+   * from the underlying input stream.
+   */
+  protected int curPosInBuffer;
+
+  protected long curPosInStream; // current offset in the input stream
+
+  private final int bufSize;
+
+  private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int
off, int len)
+
+
+  private DrillBuf getBuf() throws IOException {
+    checkInputStreamState();
+    if (internalBuffer == null) {
+      throw new IOException("Input stream is closed.");
+    }
+    return this.internalBuffer;
+  }
+
+  /**
+   * Creates a <code>BufferedDirectBufInputStream</code>
+   * with the default (8 MiB) buffer size.
+   */
+  public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id,
+      long startOffset, long totalByteSize, boolean enableHints) {
+    this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints);
+  }
+
+  /**
+   * Creates a <code>BufferedDirectBufInputStream</code>
+   * with the specified buffer size.
+   */
+  public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id,
+      long startOffset, long totalByteSize, int bufSize, boolean enableHints) {
+    super(in, allocator, id, startOffset, totalByteSize, enableHints);
+    Preconditions.checkArgument(bufSize >= 0);
+    // We make the buffer size the smaller of the buffer Size parameter or the total Byte
Size
+    // rounded to next highest pwoer of two
+    int bSize = bufSize < (int) totalByteSize ? bufSize : (int) totalByteSize;
+    // round up to next power of 2
+    bSize--;
+    bSize |= bSize >>> 1;
+    bSize |= bSize >>> 2;
+    bSize |= bSize >>> 4;
+    bSize |= bSize >>> 8;
+    bSize |= bSize >>> 16;
+    bSize++;
+    this.bufSize = bSize;
+
+  }
+
+  @Override
+  public void init() throws UnsupportedOperationException, IOException {
+    super.init();
+    this.internalBuffer = this.allocator.buffer(this.bufSize);
+    this.tempBuffer = this.allocator.buffer(defaultTempBufferSize);
+    int bytesRead = getNextBlock();
+    if (bytesRead <= 0) {
+      throw new IOException("End of stream reached while initializing buffered reader.");
+    }
+  }
+
+  /**
+   * Read one more block from the underlying stream.
+   * Assumes we have reached the end of buffered data
+   * Assumes it is being called from a synchronized block.
+   * returns number of bytes read or -1 if EOF
+   */
+  private int getNextBlock() throws IOException {
+    Preconditions.checkState(this.curPosInBuffer >= this.count,
+        "Internal error: Buffered stream has not been consumed and trying to read more from
underlying stream");
+    checkInputStreamState();
+    DrillBuf buffer = getBuf();
+    buffer.clear();
+    this.count = this.curPosInBuffer = 0;
+
+    // We *cannot* rely on the totalByteSize being correct because
+    // metadata for Parquet files is incorrect. So we read as
+    // much as we can up to the size of the buffer
+    //int bytesToRead = buffer.capacity() <= (totalByteSize + startOffset - curPosInStream
) ?
+    //    buffer.Capacity() :
+    //    (int) (totalByteSize + startOffset - curPosInStream );
+    int bytesToRead = buffer.capacity();
+
+    ByteBuffer directBuffer = buffer.nioBuffer(curPosInBuffer, bytesToRead);
+    // The DFS can return *more* bytes than requested if the capacity of the buffer is greater.
+    // i.e 'n' can be greater than bytes requested which is pretty stupid and violates
+    // the API contract; but we still have to deal with it. So we make sure the size of the
+    // buffer is exactly the same as the number of bytes requested
+    int bytesRead = -1;
+    int nBytes = 0;
+    if (bytesToRead > 0) {
+      try {
+        nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead);
+      } catch (Exception e) {
+        logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage());
+      }
+      if (nBytes > 0) {
+        buffer.writerIndex(nBytes);
+        this.count = nBytes + this.curPosInBuffer;
+        this.curPosInStream = getInputStream().getPos();
+        bytesRead = nBytes;
+        logger.trace(
+            "Stream: {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {},
Count: {}, " +
+            "CurPosInStream: {}, CurPosInBuffer: {}",
+            this.streamId, this.startOffset, this.totalByteSize, this.bufSize, bytesRead,
this.count,
+            this.curPosInStream, this.curPosInBuffer);
+      }
+    }
+    return this.count - this.curPosInBuffer;
+  }
+
+  // Reads from the internal Buffer into the output buffer
+  // May read less than the requested size if the remaining data in the buffer
+  // is less than the requested amount
+  private int readInternal(DrillBuf buf, int off, int len) throws IOException {
+    // check how many bytes are available in the buffer.
+    int bytesAvailable = this.count - this.curPosInBuffer;
+    if (bytesAvailable <= 0) {
+      // read more
+      int bytesRead = getNextBlock();
+      if (bytesRead <= 0) { // End of stream
+        return -1;
+      }
+    }
+    bytesAvailable = this.count - this.curPosInBuffer;
+    //copy into output buffer
+    int copyBytes = bytesAvailable < len ? bytesAvailable : len;
+    getBuf().getBytes(curPosInBuffer, buf, off, copyBytes);
+    buf.writerIndex(off + copyBytes);
+    this.curPosInBuffer += copyBytes;
+
+    return copyBytes;
+  }
+
+  // Reads from the internal Buffer into the output buffer
+  // May read less than the requested size if the remaining data in the buffer
+  // is less than the requested amount
+  // Does not make a copy but returns a slice of the internal buffer.
+  // Returns null if end of stream is reached
+  private DrillBuf readInternal(int off, int len) throws IOException {
+    // check how many bytes are available in the buffer.
+    int bytesAvailable = this.count - this.curPosInBuffer;
+    if (bytesAvailable <= 0) {
+      // read more
+      int bytesRead = getNextBlock();
+      if (bytesRead <= 0) { // End of stream
+        return null;
+      }
+    }
+    bytesAvailable = this.count - this.curPosInBuffer;
+    // return a slice as the  output
+    int bytesToRead = bytesAvailable < len ? bytesAvailable : len;
+    DrillBuf newBuf = this.getBuf().slice(off, bytesToRead);
+    newBuf.retain();
+    return newBuf;
+  }
+
+  /**
+   * Implements the  <code>read</code> method of <code>InputStream</code>.
+   * returns one more byte or -1 if end of stream is reached.
+   */
+  public synchronized int read() throws IOException {
+    if (this.count - this.curPosInBuffer <= 0) {
+      int bytesRead = getNextBlock();
+      // reached end of stream
+      if (bytesRead <= 0) {
+        return -1;
+      }
+    }
+    this.curPosInBuffer++;
+    return getBuf().nioBuffer().get() & 0xff;
+  }
+
+  /**
+   Has the same contract as {@link java.io.InputStream#read(byte[], int, int)}
+   Except with DrillBuf
+   */
+  public synchronized int read(DrillBuf buf, int off, int len) throws IOException {
+    checkInputStreamState();
+    Preconditions.checkArgument((off >= 0) && (len >= 0) && (buf.capacity())
>= (off + len));
+    int bytesRead = 0;
+    do {
+      int readStart = off + bytesRead;
+      int lenToRead = len - bytesRead;
+      int nRead = readInternal(buf, readStart, lenToRead);
+      if (nRead <= 0) {// if End of stream
+        if (bytesRead == 0) { // no bytes read at all
+          return -1;
+        } else {
+          return bytesRead;
+        }
+      } else {
+        bytesRead += nRead;
+        // If the last read caused us to reach the end of stream
+        // we are done
+        InputStream input = in;
+        if (input != null && input.available() <= 0) {
+          return bytesRead;
+        }
+      }
+    } while (bytesRead < len);
+    return bytesRead;
+  }
+
+
+  @Override public int read(byte[] b) throws IOException {
+    return b.length == 1 ? read() : read(b, (int) 0, b.length);
+  }
+
+
+  @Override public int read(byte[] buf, int off, int len) throws IOException {
+    checkInputStreamState();
+    Preconditions.checkArgument((off >= 0) && (len >= 0) && (buf.length)
>= (off + len));
+    int bytesRead = 0;
+    if (len == 0) {
+      return 0;
+    }
+    DrillBuf byteBuf;
+    if(len <= defaultTempBufferSize){
+      byteBuf = tempBuffer;
+    } else {
+      byteBuf = this.allocator.buffer(len);
+    }
+    do {
+      int readStart = off + bytesRead;
+      int lenToRead = len - bytesRead;
+      int nRead = readInternal(byteBuf, readStart, lenToRead);
+      if (nRead <= 0) {// if End of stream
+        if (bytesRead == 0) { // no bytes read at all
+          return -1;
+        } else {
+          return bytesRead;
+        }
+      } else {
+        byteBuf.nioBuffer().get(buf, off + bytesRead, len - bytesRead);
+        byteBuf.clear();
+        bytesRead += nRead;
+      }
+    } while (bytesRead < len);
+
+    if(len > defaultTempBufferSize){
+      byteBuf.release();
+    }
+
+    return bytesRead;
+  }
+
+
+  /**
+   Has the same contract as {@link java.io.InputStream#skip(long)}
+   * Skips upto the next n bytes.
+   * Skip may return with less than n bytes skipped
+   */
+  @Override
+  public synchronized long skip(long n) throws IOException {
+    checkInputStreamState();
+    long bytesAvailable = this.count - this.curPosInBuffer;
+    long bytesSkipped = 0;
+    if (n <= 0) {
+      return 0;
+    }
+    if (bytesAvailable <= 0) {
+      checkInputStreamState();
+      bytesAvailable = getNextBlock();
+      if (bytesAvailable <= 0) { // End of stream
+        return 0;
+      }
+    }
+    bytesSkipped = bytesAvailable < n ? bytesAvailable : n;
+    this.curPosInBuffer += bytesSkipped;
+
+    return bytesSkipped;
+  }
+
+
+  @Override
+  public synchronized int available() throws IOException {
+    checkInputStreamState();
+    int bytesAvailable = this.count - this.curPosInBuffer;
+    int underlyingAvailable = getInputStream().available();
+    int available = bytesAvailable + underlyingAvailable;
+    if (available < 0) { // overflow
+      return Integer.MAX_VALUE;
+    }
+    return available;
+  }
+
+  @Override
+  public synchronized void mark(int readlimit) {
+    throw new UnsupportedOperationException("Mark/reset is not supported.");
+  }
+
+  @Override
+  public synchronized void reset() throws IOException {
+    throw new UnsupportedOperationException("Mark/reset is not supported.");
+  }
+
+  @Override
+  public boolean markSupported() {
+    return false;
+  }
+
+  /*
+    Returns the current position from the beginning of the underlying input stream
+   */
+  public long getPos() throws IOException {
+    return curPosInBuffer+startOffset;
+  }
+
+  public boolean hasRemainder() throws IOException {
+    return available() > 0;
+  }
+
+  public void close() throws IOException {
+    DrillBuf buffer;
+    InputStream inp;
+    if ((inp = in) != null) {
+      in = null;
+      inp.close();
+    }
+    if ((buffer = this.internalBuffer) != null) {
+      synchronized (this) {
+        this.internalBuffer = null;
+        buffer.release();
+      }
+    }
+    if ((buffer = this.tempBuffer) != null) {
+      synchronized (this) {
+        this.tempBuffer = null;
+        buffer.release();
+      }
+    }
+  }
+
+  public static void main(String[] args) {
+    final DrillConfig config = DrillConfig.create();
+    final BufferAllocator allocator = RootAllocatorFactory.newRoot(config);
+    final Configuration dfsConfig = new Configuration();
+    String fileName = args[0];
+    Path filePath = new Path(fileName);
+    final int BUFSZ = 8 * 1024 * 1024;
+    try {
+      List<Footer> footers = ParquetFileReader.readFooters(dfsConfig, filePath);
+      Footer footer = (Footer) footers.iterator().next();
+      FileSystem fs = FileSystem.get(dfsConfig);
+      int rowGroupIndex = 0;
+      List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();
+      for (BlockMetaData block : blocks) {
+        List<ColumnChunkMetaData> columns = block.getColumns();
+        for (ColumnChunkMetaData columnMetadata : columns) {
+          FSDataInputStream inputStream = fs.open(filePath);
+          long startOffset = columnMetadata.getStartingPos();
+          long totalByteSize = columnMetadata.getTotalSize();
+          String streamId = fileName + ":" + columnMetadata.toString();
+          BufferedDirectBufInputStream reader =
+              new BufferedDirectBufInputStream(inputStream, allocator, streamId, startOffset,
+                  totalByteSize, BUFSZ, true);
+          reader.init();
+          while (true) {
+            try {
+              DrillBuf buf = reader.getNext(BUFSZ - 1);
+              if (buf == null) {
+                break;
+              }
+              buf.release();
+            } catch (Exception e) {
+              e.printStackTrace();
+              break;
+            }
+          }
+          reader.close();
+        }
+      } // for each Block
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+    allocator.close();
+    return;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/fe2334ee/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
new file mode 100644
index 0000000..71c36e6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util.filereader;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.hadoop.fs.ByteBufferReadable;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.parquet.hadoop.util.CompatibilityUtil;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+
+public class DirectBufInputStream extends FilterInputStream {
+
+  private static final org.slf4j.Logger logger =
+      org.slf4j.LoggerFactory.getLogger(DirectBufInputStream.class);
+
+  protected boolean enableHints = true;
+  protected String streamId; // a name for logging purposes only
+  protected BufferAllocator allocator;
+  /**
+   * The length of the data we expect to read. The caller may, in fact,
+   * ask for more or less bytes. However this is useful for providing hints where
+   * the underlying InputStream supports hints (e.g. fadvise)
+   */
+  protected final long totalByteSize;
+
+  /**
+   * The offset in the underlying stream to start reading from
+   */
+  protected final long startOffset;
+
+  public DirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long
startOffset,
+      long totalByteSize, boolean enableHints) {
+    super(in);
+    Preconditions.checkArgument(startOffset >= 0);
+    Preconditions.checkArgument(totalByteSize >= 0);
+    this.streamId = id;
+    this.allocator = allocator;
+    this.startOffset = startOffset;
+    this.totalByteSize = totalByteSize;
+    this.enableHints = enableHints;
+  }
+
+  public void init() throws IOException, UnsupportedOperationException {
+    checkStreamSupportsByteBuffer();
+    if (enableHints) {
+      fadviseIfAvailable(getInputStream(), this.startOffset, this.totalByteSize);
+    }
+    getInputStream().seek(this.startOffset);
+    return;
+  }
+
+  public int read() throws IOException {
+    return getInputStream().read();
+  }
+
+  public synchronized int read(DrillBuf buf, int off, int len) throws IOException {
+    buf.clear();
+    ByteBuffer directBuffer = buf.nioBuffer(0, len);
+    int lengthLeftToRead = len;
+    while (lengthLeftToRead > 0) {
+      lengthLeftToRead -= CompatibilityUtil.getBuf(getInputStream(), directBuffer, lengthLeftToRead);
+    }
+    buf.writerIndex(len);
+    return len;
+  }
+
+  public synchronized DrillBuf getNext(int bytes) throws IOException {
+    DrillBuf b = allocator.buffer(bytes);
+    int bytesRead = read(b, 0, bytes);
+    if (bytesRead <= -1) {
+      b.release();
+      return null;
+    }
+    return b;
+  }
+
+  public long getPos() throws IOException {
+    return getInputStream().getPos();
+  }
+
+  public boolean hasRemainder() throws IOException {
+    return getInputStream().available() > 0;
+  }
+
+  protected FSDataInputStream getInputStream() throws IOException {
+    // Make sure stream is open
+    checkInputStreamState();
+    return (FSDataInputStream) in;
+  }
+
+  protected void checkInputStreamState() throws IOException {
+    if (in == null) {
+      throw new IOException("Input stream is closed.");
+    }
+  }
+
+  protected void checkStreamSupportsByteBuffer() throws UnsupportedOperationException {
+    // Check input stream supports ByteBuffer
+    if (!(in instanceof ByteBufferReadable)) {
+      throw new UnsupportedOperationException("The input stream is not ByteBuffer readable.");
+    }
+  }
+
+  protected static void fadviseIfAvailable(FSDataInputStream inputStream, long off, long
n) {
+    Method readAhead;
+    final Class adviceType;
+
+    try {
+      adviceType = Class.forName("org.apache.hadoop.fs.FSDataInputStream$FadviseType");
+    } catch (ClassNotFoundException e) {
+      logger.info("Unable to call fadvise due to: {}", e.toString());
+      readAhead = null;
+      return;
+    }
+    try {
+      Class<? extends FSDataInputStream> inputStreamClass = inputStream.getClass();
+      readAhead =
+          inputStreamClass.getMethod("adviseFile", new Class[] {adviceType, long.class, long.class});
+    } catch (NoSuchMethodException e) {
+      logger.info("Unable to call fadvise due to: {}", e.toString());
+      readAhead = null;
+      return;
+    }
+    if (readAhead != null) {
+      Object[] adviceTypeValues = adviceType.getEnumConstants();
+      for (int idx = 0; idx < adviceTypeValues.length; idx++) {
+        if ((adviceTypeValues[idx]).toString().contains("SEQUENTIAL")) {
+          try {
+            readAhead.invoke(inputStream, adviceTypeValues[idx], off, n);
+          } catch (IllegalAccessException e) {
+            logger.info("Unable to call fadvise due to: {}", e.toString());
+          } catch (InvocationTargetException e) {
+            logger.info("Unable to call fadvise due to: {}", e.toString());
+          }
+          break;
+        }
+      }
+    }
+    return;
+  }
+
+
+}


Mime
View raw message