drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [04/10] drill git commit: DRILL-4800: Various fixes. Fix buffer underflow exception in BufferedDirectBufInputStream. Fix writer index for in64 dictionary encoded types. Added logging to help debug. Fix memory leaks. Work around issues with of InputStream
Date Sat, 05 Nov 2016 00:11:31 GMT
DRILL-4800: Various fixes. Fix buffer underflow exception in BufferedDirectBufInputStream.
Fix writer index for in64 dictionary encoded types. Added logging to help debug. Fix memory
leaks. Work around issues with of InputStream.available() ( Do not use hasRemainder; Remove
check for EOF in BufferedDirectBufInputStream.read() ). Finalize defaults. Remove commented
code.
Addressed review comments

This closes #611


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ee3489ce
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ee3489ce
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ee3489ce

Branch: refs/heads/master
Commit: ee3489ce3b6e5ad53e5c3a59b6e2e4e50773c630
Parents: 7f5acf8
Author: Parth Chandra <parthc@apache.org>
Authored: Tue Sep 13 21:47:49 2016 -0700
Committer: Parth Chandra <parthc@apache.org>
Committed: Fri Nov 4 15:57:44 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/drill/test/DrillTest.java   |  2 +-
 .../drill/exec/ops/OperatorContextImpl.java     |  3 +
 .../drill/exec/server/BootStrapContext.java     |  7 +-
 .../parquet/columnreaders/AsyncPageReader.java  | 71 +++++++-------
 .../parquet/columnreaders/ColumnReader.java     | 61 ++++--------
 .../NullableFixedByteAlignedReaders.java        |  3 +
 .../store/parquet/columnreaders/PageReader.java | 16 ++--
 .../columnreaders/ParquetRecordReader.java      | 20 ++--
 .../columnreaders/VarLenBinaryReader.java       | 86 +++--------------
 .../BufferedDirectBufInputStream.java           | 97 ++++++++++----------
 .../util/filereader/DirectBufInputStream.java   | 21 ++++-
 .../physical/impl/writer/TestParquetWriter.java |  3 +-
 12 files changed, 165 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/common/src/test/java/org/apache/drill/test/DrillTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java
index ccc297d..18c2c1a 100644
--- a/common/src/test/java/org/apache/drill/test/DrillTest.java
+++ b/common/src/test/java/org/apache/drill/test/DrillTest.java
@@ -55,7 +55,7 @@ public class DrillTest {
   static MemWatcher memWatcher;
   static String className;
 
-  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000*10000);
+  @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000);
   @Rule public final TestLogReporter logOutcome = LOG_OUTCOME;
 
   @Rule public final TestRule REPEAT_RULE = TestTools.getRepeatRule(false);

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 38ddd16..390b71c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -192,6 +192,9 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable
{
   }
 
   @Override
+  /*
+     Creates a DrillFileSystem that does not automatically track operator stats.
+   */
   public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException
{
     Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be
called once per OperatorContext");
     fs = new DrillFileSystem(conf, null);

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
index adb6323..c498185 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
@@ -53,8 +53,7 @@ public class BootStrapContext implements AutoCloseable {
     this.config = config;
     this.classpathScan = classpathScan;
     this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS),
"BitServer-");
-    this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS),
-        "BitClient-");
+    this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS),
"BitClient-");
     // Note that metrics are stored in a static instance
     this.metrics = DrillMetrics.getRegistry();
     this.allocator = RootAllocatorFactory.newRoot(config);
@@ -79,8 +78,8 @@ public class BootStrapContext implements AutoCloseable {
     final int numScanDecodeThreads = (int) config.getDouble(ExecConstants.SCAN_DECODE_THREADPOOL_SIZE);
     final int scanThreadPoolSize =
         MIN_SCAN_THREADPOOL_SIZE > numScanThreads ? MIN_SCAN_THREADPOOL_SIZE : numScanThreads;
-    final int scanDecodeThreadPoolSize = numCores > numScanDecodeThreads ? numCores :
numScanDecodeThreads;
-
+    final int scanDecodeThreadPoolSize =
+        (numCores + 1) / 2 > numScanDecodeThreads ? (numCores + 1) / 2 : numScanDecodeThreads;
     this.scanExecutor = Executors.newFixedThreadPool(scanThreadPoolSize, new NamedThreadFactory("scan-"));
     this.scanDecodeExecutor =
         Executors.newFixedThreadPool(scanDecodeThreadPoolSize, new NamedThreadFactory("scan-decode-"));

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index b2bdef3..e2ba865 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -63,10 +63,12 @@ class AsyncPageReader extends PageReader {
     asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
   }
 
-  @Override protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
+  @Override
+  protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
       final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws
UserException {
     if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
       try {
+        assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
         dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos());
       } catch (IOException e) {
         handleAndThrowException(e, "Error Reading dictionary page.");
@@ -90,12 +92,12 @@ class AsyncPageReader extends PageReader {
       isDictionary = readStatus.isDictionaryPage;
     }
     if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED)
{
-      DrillBuf uncompressedData = data;
-      data = decompress(readStatus.getPageHeader(), uncompressedData);
+      DrillBuf compressedData = data;
+      data = decompress(readStatus.getPageHeader(), compressedData);
       synchronized (this) {
         readStatus.setPageData(null);
       }
-      uncompressedData.release();
+      compressedData.release();
     } else {
       if (isDictionary) {
         stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
@@ -160,23 +162,12 @@ class AsyncPageReader extends PageReader {
     pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
     try {
       timer.start();
-      if (logger.isTraceEnabled()) {
-        logger.trace("Decompress (1)==> Col: {}  readPos: {}  compressed_size: {}  compressedPageData:
{}",
-            parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
-            pageHeader.getCompressed_page_size(), ByteBufUtil.hexDump(compressedData));
-      }
       CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
       ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
       ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
       DecompressionHelper decompressionHelper = new DecompressionHelper(codecName);
       decompressionHelper.decompress(input, compressedSize, output, uncompressedSize);
       pageDataBuf.writerIndex(uncompressedSize);
-      if (logger.isTraceEnabled()) {
-        logger.trace(
-            "Decompress (2)==> Col: {}  readPos: {}  uncompressed_size: {}  uncompressedPageData:
{}",
-            parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
-            pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageDataBuf));
-      }
       timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
       this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
     } catch (IOException e) {
@@ -219,30 +210,23 @@ class AsyncPageReader extends PageReader {
       }
     } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
 
-    if (dataReader.hasRemainder() && parentColumnReader.totalValuesRead + readStatus.getValuesRead()
+    if (parentColumnReader.totalValuesRead + readStatus.getValuesRead()
         < parentColumnReader.columnChunkMetaData.getValueCount()) {
       asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
     }
 
     pageHeader = readStatus.getPageHeader();
     pageData = getDecompressedPageData(readStatus);
-    if (logger.isTraceEnabled()) {
-      logger.trace("AsyncPageReader: Col: {}  pageData: {}",
-          this.parentColumnReader.columnChunkMetaData.toString(), ByteBufUtil.hexDump(pageData));
-      logger.trace("AsyncPageReaderTask==> Col: {}  readPos: {}  Uncompressed_size: {}
 pageData: {}",
-          parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
-          pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageData));
-    }
+
 
   }
 
 
   @Override public void clear() {
     if (asyncPageRead != null) {
-      asyncPageRead.cancel(true);
       try {
-        ReadStatus r = asyncPageRead.get();
-        r.getPageData().release();
+        final ReadStatus readStatus = asyncPageRead.get();
+        readStatus.getPageData().release();
       } catch (Exception e) {
         // Do nothing.
       }
@@ -319,7 +303,8 @@ class AsyncPageReader extends PageReader {
       ReadStatus readStatus = new ReadStatus();
 
       String oldname = Thread.currentThread().getName();
-      Thread.currentThread().setName(parent.parentColumnReader.columnChunkMetaData.toString());
+      String name = parent.parentColumnReader.columnChunkMetaData.toString();
+      Thread.currentThread().setName(name);
 
       long bytesRead = 0;
       long valuesRead = 0;
@@ -327,10 +312,28 @@ class AsyncPageReader extends PageReader {
 
       DrillBuf pageData = null;
       try {
+        long s = parent.dataReader.getPos();
         PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
+        long e = parent.dataReader.getPos();
+        if (logger.isTraceEnabled()) {
+          logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes Read = {} ", name,
s, e - s);
+        }
         int compressedSize = pageHeader.getCompressed_page_size();
+        s = parent.dataReader.getPos();
         pageData = parent.dataReader.getNext(compressedSize);
+        e = parent.dataReader.getPos();
         bytesRead = compressedSize;
+
+        if (logger.isTraceEnabled()) {
+          DrillBuf bufStart = pageData.slice(0, compressedSize>100?100:compressedSize);
+          int endOffset = compressedSize>100?compressedSize-100:0;
+          DrillBuf bufEnd = pageData.slice(endOffset, compressedSize-endOffset);
+          logger
+              .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read = {} : Buf Start =
{} : Buf End = {} ",
+                  name, s, e - s, ByteBufUtil.hexDump(bufStart), ByteBufUtil.hexDump(bufEnd));
+
+        }
+
         synchronized (parent) {
           if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
             readStatus.setIsDictionaryPage(true);
@@ -353,10 +356,6 @@ class AsyncPageReader extends PageReader {
         throw e;
       }
       Thread.currentThread().setName(oldname);
-      if(logger.isTraceEnabled()) {
-        logger.trace("AsyncPageReaderTask==> Col: {}  readPos: {}  bytesRead: {}  pageData:
{}", parent.parentColumnReader.columnChunkMetaData.toString(),
-            parent.dataReader.getPos(), bytesRead, ByteBufUtil.hexDump(pageData));
-      }
       return readStatus;
     }
 
@@ -365,7 +364,7 @@ class AsyncPageReader extends PageReader {
   private class DecompressionHelper {
     final CompressionCodecName codecName;
 
-    public DecompressionHelper(CompressionCodecName codecName){
+    public DecompressionHelper(CompressionCodecName codecName) {
       this.codecName = codecName;
     }
 
@@ -376,6 +375,7 @@ class AsyncPageReader extends PageReader {
       // expensive copying.
       if (codecName == CompressionCodecName.GZIP) {
         GzipCodec codec = new GzipCodec();
+        // DirectDecompressor: @see https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/compress/DirectDecompressor.html
         DirectDecompressor directDecompressor = codec.createDirectDecompressor();
         if (directDecompressor != null) {
           logger.debug("Using GZIP direct decompressor.");
@@ -394,9 +394,10 @@ class AsyncPageReader extends PageReader {
           output.put(outputBytes);
         }
       } else if (codecName == CompressionCodecName.SNAPPY) {
-        // For Snappy, just call the Snappy decompressor directly.
-        // It is thread safe. The Hadoop layers though, appear to be
-        // not quite reliable in a multithreaded environment
+        // For Snappy, just call the Snappy decompressor directly instead
+        // of going thru the DirectDecompressor class.
+        // The Snappy codec is itself thread safe, while going thru the DirectDecompressor
path
+        // seems to have concurrency issues.
         output.clear();
         int size = Snappy.uncompress(input, output);
         output.limit(size);

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index 29e23bc..73cbc3d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -163,22 +163,13 @@ public abstract class ColumnReader<V extends ValueVector> {
 
   protected abstract void readField(long recordsToRead);
 
-  /*
-  public Future<Boolean> determineSizeAsync(long recordsReadInCurrentPass,
-      Integer lengthVarFieldsInCurrentRecord) throws IOException {
-    Future<Boolean> r = threadPool.submit(
-        new ColumnReaderDetermineSizeTask(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord));
-    return r;
-  }
-  */
-
   /**
    * Determines the size of a single value in a variable column.
    *
    * Return value indicates if we have finished a row group and should stop reading
    *
    * @param recordsReadInCurrentPass
-   * @ param lengthVarFieldsInCurrentRecord
+   * @param lengthVarFieldsInCurrentRecord
    * @return - true if we should stop reading
    * @throws IOException
    */
@@ -194,7 +185,7 @@ public abstract class ColumnReader<V extends ValueVector> {
       return true;
     }
 
-    //lengthVarFieldsInCurrentRecord += dataTypeLengthInBits;
+    // Never used in this code path. Hard to remove because the method is overidden by subclasses
     lengthVarFieldsInCurrentRecord = -1;
 
     doneReading = checkVectorCapacityReached();
@@ -307,41 +298,18 @@ public abstract class ColumnReader<V extends ValueVector> {
     @Override public Long call() throws IOException{
 
       String oldname = Thread.currentThread().getName();
-      Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString());
-
-      this.parent.processPages(recordsToReadInThisPass);
-
-      Thread.currentThread().setName(oldname);
-      return recordsToReadInThisPass;
-    }
-
-  }
-
-  /*
-  private class ColumnReaderDetermineSizeTask implements Callable<Boolean> {
-
-    private final ColumnReader parent = ColumnReader.this;
-    private final long recordsReadInCurrentPass;
-    private final Integer lengthVarFieldsInCurrentRecord;
-
-    public ColumnReaderDetermineSizeTask(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord){
-      this.recordsReadInCurrentPass = recordsReadInCurrentPass;
-      this.lengthVarFieldsInCurrentRecord = lengthVarFieldsInCurrentRecord;
-    }
-
-    @Override public Boolean call() throws IOException{
-
-      String oldname = Thread.currentThread().getName();
-      Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString());
+      try {
+        Thread.currentThread().setName(oldname + "Decode-" + this.parent.columnChunkMetaData.toString());
 
-      boolean b = this.parent.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord);
+        this.parent.processPages(recordsToReadInThisPass);
+        return recordsToReadInThisPass;
 
-      Thread.currentThread().setName(oldname);
-      return b;
+      } finally {
+        Thread.currentThread().setName(oldname);
+      }
     }
 
   }
-  */
 
   private class ColumnReaderReadRecordsTask implements Callable<Integer> {
 
@@ -355,12 +323,15 @@ public abstract class ColumnReader<V extends ValueVector> {
     @Override public Integer call() throws IOException{
 
       String oldname = Thread.currentThread().getName();
-      Thread.currentThread().setName("Decode-"+this.parent.columnChunkMetaData.toString());
+      try {
+        Thread.currentThread().setName("Decode-" + this.parent.columnChunkMetaData.toString());
 
-      this.parent.readRecords(recordsToRead);
+        this.parent.readRecords(recordsToRead);
+        return recordsToRead;
 
-      Thread.currentThread().setName(oldname);
-      return recordsToRead;
+      } finally {
+        Thread.currentThread().setName(oldname);
+      }
     }
 
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index f4fe5ee..e20504f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -150,7 +150,10 @@ public class NullableFixedByteAlignedReaders {
         for (int i = 0; i < recordsToReadInThisPass; i++){
           valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
         }
+        int writerIndex = castedBaseVector.getBuffer().writerIndex();
+        castedBaseVector.getBuffer().setIndex(0, writerIndex + (int)readLength);
       } else {
+
         for (int i = 0; i < recordsToReadInThisPass; i++){
           valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger());
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 0736f01..f71eeae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import com.google.common.base.Stopwatch;
+import io.netty.buffer.ByteBufUtil;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
 import io.netty.buffer.ByteBuf;
@@ -63,9 +64,8 @@ class PageReader {
   public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter;
 
   protected final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?>
parentColumnReader;
-  //private final ColumnDataReader dataReader;
   protected final DirectBufInputStream dataReader;
-  //der; buffer to store bytes of current page
+  //buffer to store bytes of current page
   protected DrillBuf pageData;
 
   // for variable length data we need to keep track of our current position in the page data
@@ -189,6 +189,11 @@ class PageReader {
     if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED)
{
       timer.start();
       pageDataBuf = dataReader.getNext(compressedSize);
+      if (logger.isTraceEnabled()) {
+        logger.trace("PageReaderTask==> Col: {}  readPos: {}  Uncompressed_size: {}  pageData:
{}",
+            parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
+            pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageData));
+      }
       timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
       this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize);
     } else {
@@ -247,9 +252,6 @@ class PageReader {
       }
     } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
 
-    //TODO: Handle buffer allocation exception
-
-    //allocatePageData(pageHeader.getUncompressed_page_size());
     int compressedSize = pageHeader.getCompressed_page_size();
     int uncompressedSize = pageHeader.getUncompressed_page_size();
     pageData = readPage(pageHeader, compressedSize, uncompressedSize);
@@ -270,7 +272,7 @@ class PageReader {
 
     // TODO - the metatdata for total size appears to be incorrect for impala generated files,
need to find cause
     // and submit a bug report
-    if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount())
{
+    if(parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount())
{
       return false;
     }
     clearBuffers();
@@ -395,7 +397,7 @@ class PageReader {
 
   public void clear(){
     try {
-      this.inputStream.close();
+      // data reader also owns the input stream and will close it.
       this.dataReader.close();
     } catch (IOException e) {
       //Swallow the exception which is OK for input streams

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 4f0e3b5..69f6a62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -476,11 +476,11 @@ public class ParquetRecordReader extends AbstractRecordReader {
    if(useAsyncColReader){
     readAllFixedFieldsParallel(recordsToRead) ;
    } else {
-     readAllFixedFieldsiSerial(recordsToRead); ;
+     readAllFixedFieldsSerial(recordsToRead); ;
    }
  }
 
-  public void readAllFixedFieldsiSerial(long recordsToRead) throws IOException {
+  public void readAllFixedFieldsSerial(long recordsToRead) throws IOException {
     for (ColumnReader<?> crs : columnStatuses) {
       crs.processPages(recordsToRead);
     }
@@ -492,14 +492,22 @@ public class ParquetRecordReader extends AbstractRecordReader {
       Future<Long> f = crs.processPagesAsync(recordsToRead);
       futures.add(f);
     }
+    Exception exception = null;
     for(Future f: futures){
-      try {
-        f.get();
-      } catch (Exception e) {
+      if(exception != null) {
         f.cancel(true);
-        handleAndRaise(null, e);
+      } else {
+        try {
+          f.get();
+        } catch (Exception e) {
+          f.cancel(true);
+          exception = e;
+        }
       }
     }
+    if(exception != null){
+      handleAndRaise(null, exception);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
index c78dc7a..7bcce11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java
@@ -57,11 +57,7 @@ public class VarLenBinaryReader {
       columnReader.reset();
     }
 
-    //if(useAsyncTasks){
-    //  recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass);
-    //} else {
-      recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass);
-    //}
+    recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass);
     if(useAsyncTasks){
       readRecordsParallel(recordsReadInCurrentPass);
     }else{
@@ -102,71 +98,6 @@ public class VarLenBinaryReader {
     return recordsReadInCurrentPass;
   }
 
-
-  public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException {
-    boolean doneReading = false;
-    int lengthVarFieldsInCurrentRecord = 0;
-    boolean exitLengthDeterminingLoop = false;
-    long totalVariableLengthData = 0;
-    long recordsReadInCurrentPass = 0;
-
-    do {
-    doneReading = readPagesParallel();
-
-    if (!doneReading) {
-      lengthVarFieldsInCurrentRecord = 0;
-      for (VarLengthColumn<?> columnReader : columns) {
-        doneReading = columnReader.processPageData((int) recordsReadInCurrentPass);
-        if(doneReading) {
-          break;
-        }
-        lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
-        doneReading = columnReader.checkVectorCapacityReached();
-        if(doneReading) {
-          break;
-        }
-      }
-    }
-
-    exitLengthDeterminingLoop = doneReading;
-
-      // check that the next record will fit in the batch
-      if (exitLengthDeterminingLoop ||
-          (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields()
-              + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize())
{
-        break;
-      }
-      for (VarLengthColumn<?> columnReader : columns) {
-        columnReader.updateReadyToReadPosition();
-        columnReader.currDefLevel = -1;
-      }
-      recordsReadInCurrentPass++;
-      totalVariableLengthData += lengthVarFieldsInCurrentRecord;
-    } while (recordsReadInCurrentPass < recordsToReadInThisPass);
-
-    return recordsReadInCurrentPass;
-  }
-
-  public boolean readPagesParallel() {
-
-    boolean isDone = false;
-    ArrayList<Future<Boolean>> futures = Lists.newArrayList();
-    for (VarLengthColumn<?> columnReader : columns) {
-      Future<Boolean> f = columnReader.readPageAsync();
-      futures.add(f);
-    }
-    for (Future<Boolean> f : futures) {
-      try {
-        isDone = isDone || f.get().booleanValue();
-      } catch (Exception e) {
-        f.cancel(true);
-        handleAndRaise(null, e);
-      }
-    }
-    return isDone;
-  }
-
-
   private void readRecordsSerial(long recordsReadInCurrentPass) {
     for (VarLengthColumn<?> columnReader : columns) {
       columnReader.readRecords(columnReader.pageReader.valuesReadyToRead);
@@ -182,12 +113,17 @@ public class VarLenBinaryReader {
       Future<Integer> f = columnReader.readRecordsAsync(columnReader.pageReader.valuesReadyToRead);
       futures.add(f);
     }
-    for (Future f : futures) {
-      try {
-        f.get();
-      } catch (Exception e) {
+    Exception exception = null;
+    for(Future f: futures){
+      if(exception != null) {
         f.cancel(true);
-        handleAndRaise(null, e);
+      } else {
+        try {
+          f.get();
+        } catch (Exception e) {
+          f.cancel(true);
+          exception = e;
+        }
       }
     }
     for (VarLengthColumn<?> columnReader : columns) {

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
index a5a6b81..327c9a1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java
@@ -19,24 +19,13 @@ package org.apache.drill.exec.util.filereader;
 
 import com.google.common.base.Preconditions;
 import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.RootAllocatorFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.Footer;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.BlockMetaData;
-import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.util.CompatibilityUtil;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.util.List;
 
 /**
  * <code>BufferedDirectBufInputStream</code>  reads from the
@@ -52,8 +41,9 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement
   private static final org.slf4j.Logger logger =
       org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class);
 
-  private static int defaultBufferSize = 8192 * 1024; // 8 MiB
-  private static int defaultTempBufferSize = 8192; // 8 KiB
+  private static final int DEFAULT_BUFFER_SIZE = 8192 * 1024; // 8 MiB
+  private static final int DEFAULT_TEMP_BUFFER_SIZE = 8192; // 8 KiB
+  private static final int SMALL_BUFFER_SIZE = 64 * 1024; // 64 KiB
 
   /**
    * The internal buffer to keep data read from the underlying inputStream.
@@ -82,11 +72,10 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream
implement
 
   protected long curPosInStream; // current offset in the input stream
 
-  private final int bufSize;
+  private int bufSize;
 
   private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int
off, int len)
 
-
   private DrillBuf getBuf() throws IOException {
     checkInputStreamState();
     if (internalBuffer == null) {
@@ -101,7 +90,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream
implement
    */
   public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id,
       long startOffset, long totalByteSize, boolean enableHints) {
-    this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints);
+    this(in, allocator, id, startOffset, totalByteSize, DEFAULT_BUFFER_SIZE, enableHints);
   }
 
   /**
@@ -130,13 +119,21 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream
implement
   @Override public void init() throws UnsupportedOperationException, IOException {
     super.init();
     this.internalBuffer = this.allocator.buffer(this.bufSize);
-    this.tempBuffer = this.allocator.buffer(defaultTempBufferSize);
+    this.tempBuffer = this.allocator.buffer(DEFAULT_TEMP_BUFFER_SIZE);
     int bytesRead = getNextBlock();
     if (bytesRead <= 0) {
       throw new IOException("End of stream reached while initializing buffered reader.");
     }
   }
 
+  private DrillBuf reallocBuffer(int newSize ){
+    this.internalBuffer.release();
+    this.bufSize = newSize;
+    this.internalBuffer = this.allocator.buffer(this.bufSize);
+    logger.debug("Internal buffer resized to {}", newSize);
+    return this.internalBuffer;
+  }
+
   /**
    * Read one more block from the underlying stream.
    * Assumes we have reached the end of buffered data
@@ -152,11 +149,14 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream
implement
     this.count = this.curPosInBuffer = 0;
 
     // We *cannot* rely on the totalByteSize being correct because
-    // metadata for Parquet files is incorrect. So we read as
-    // much as we can up to the size of the buffer
-    //int bytesToRead = buffer.capacity() <= (totalByteSize + startOffset - curPosInStream
) ?
-    //    buffer.Capacity() :
-    //    (int) (totalByteSize + startOffset - curPosInStream );
+    // metadata for Parquet files is incorrect (sometimes). So we read
+    // beyond the totalByteSize parameter. However, to prevent ourselves from reading too
+    // much data, we reduce the size of the buffer, down to 64KiB.
+    if (buffer.capacity() >= (totalByteSize + startOffset - curPosInStream)) {
+      if (buffer.capacity() > SMALL_BUFFER_SIZE) {
+        buffer = this.reallocBuffer(SMALL_BUFFER_SIZE);
+      }
+    }
     int bytesToRead = buffer.capacity();
 
     ByteBuffer directBuffer = buffer.nioBuffer(curPosInBuffer, bytesToRead);
@@ -171,6 +171,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream
implement
         nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead);
       } catch (Exception e) {
         logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage());
+        throw new IOException((e));
       }
       if (nBytes > 0) {
         buffer.writerIndex(nBytes);
@@ -269,12 +270,13 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream
implement
         }
       } else {
         bytesRead += nRead;
+        //TODO: Uncomment this when the InputStream.available() call is fixed.
         // If the last read caused us to reach the end of stream
-        // we are done
-        InputStream input = in;
-        if (input != null && input.available() <= 0) {
-          return bytesRead;
-        }
+        // we are done.
+        //InputStream input = in;
+        //if (input != null && input.available() <= 0) {
+        //  return bytesRead;
+        //}
       }
     } while (bytesRead < len);
     return bytesRead;
@@ -294,7 +296,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream
implement
       return 0;
     }
     DrillBuf byteBuf;
-    if (len <= defaultTempBufferSize) {
+    if (len <= DEFAULT_TEMP_BUFFER_SIZE) {
       byteBuf = tempBuffer;
     } else {
       byteBuf = this.allocator.buffer(len);
@@ -310,13 +312,13 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream
implement
           return bytesRead;
         }
       } else {
-        byteBuf.nioBuffer().get(buf, off + bytesRead, len - bytesRead);
+        byteBuf.nioBuffer().get(buf, off + bytesRead, nRead);
         byteBuf.clear();
         bytesRead += nRead;
       }
     } while (bytesRead < len);
 
-    if (len > defaultTempBufferSize) {
+    if (len > DEFAULT_TEMP_BUFFER_SIZE) {
       byteBuf.release();
     }
 
@@ -380,27 +382,26 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream
implement
     return curPosInBuffer + startOffset;
   }
 
-  public boolean hasRemainder() throws IOException {
-    return available() > 0;
-  }
-
   public void close() throws IOException {
     DrillBuf buffer;
     InputStream inp;
-    if ((inp = in) != null) {
-      in = null;
-      inp.close();
-    }
-    if ((buffer = this.internalBuffer) != null) {
-      synchronized (this) {
-        this.internalBuffer = null;
-        buffer.release();
-      }
-    }
-    if ((buffer = this.tempBuffer) != null) {
-      synchronized (this) {
-        this.tempBuffer = null;
-        buffer.release();
+    synchronized (this) {
+      try {
+        if ((inp = in) != null) {
+          in = null;
+          inp.close();
+        }
+      } catch (IOException e) {
+        throw e;
+      } finally {
+        if ((buffer = this.internalBuffer) != null) {
+          this.internalBuffer = null;
+          buffer.release();
+        }
+        if ((buffer = this.tempBuffer) != null) {
+          this.tempBuffer = null;
+          buffer.release();
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
index 71c36e6..f11ad1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java
@@ -89,7 +89,13 @@ public class DirectBufInputStream extends FilterInputStream {
 
   public synchronized DrillBuf getNext(int bytes) throws IOException {
     DrillBuf b = allocator.buffer(bytes);
-    int bytesRead = read(b, 0, bytes);
+    int bytesRead = -1;
+    try {
+    bytesRead = read(b, 0, bytes);
+    } catch (IOException e){
+      b.release();
+      throw e;
+    }
     if (bytesRead <= -1) {
       b.release();
       return null;
@@ -102,7 +108,10 @@ public class DirectBufInputStream extends FilterInputStream {
   }
 
   public boolean hasRemainder() throws IOException {
-    return getInputStream().available() > 0;
+    // We use the following instead of "getInputStream.available() > 0" because
+    // available() on HDFS seems to have issues with file sizes
+    // that are greater than Integer.MAX_VALUE
+    return (this.getPos() < (this.startOffset + this.totalByteSize));
   }
 
   protected FSDataInputStream getInputStream() throws IOException {
@@ -117,6 +126,14 @@ public class DirectBufInputStream extends FilterInputStream {
     }
   }
 
+  public synchronized void close() throws IOException {
+    InputStream inp;
+    if ((inp = in) != null) {
+      in = null;
+      inp.close();
+    }
+  }
+
   protected void checkStreamSupportsByteBuffer() throws UnsupportedOperationException {
     // Check input stream supports ByteBuffer
     if (!(in instanceof ByteBufferReadable)) {

http://git-wip-us.apache.org/repos/asf/drill/blob/ee3489ce/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 56b94d7..ae0e699 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -929,6 +929,7 @@ public class TestParquetWriter extends BaseTestQuery {
     }
   }
 
+  @Ignore ("Used to test decompression in AsyncPageReader. Takes too long.")
   @Test
   public void testTPCHReadWriteRunRepeated() throws Exception {
     for (int i = 1; i <= repeat; i++) {
@@ -945,7 +946,6 @@ public class TestParquetWriter extends BaseTestQuery {
     try {
       test(String.format("alter session set `%s` = 'gzip'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
       String inputTable = "cp.`tpch/supplier.parquet`";
-//      runTestAndValidate("s_suppkey, s_nationkey, s_acctbal", "s_suppkey, s_nationkey,
s_acctbal", inputTable, "suppkey_parquet_dict_gzip");
         runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_gzip");
     } finally {
       test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR.getDefault().string_val));
@@ -957,7 +957,6 @@ public class TestParquetWriter extends BaseTestQuery {
     try {
       test(String.format("alter session set `%s` = 'snappy'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE));
       String inputTable = "cp.`supplier_snappy.parquet`";
-      //      runTestAndValidate("s_suppkey, s_nationkey, s_acctbal", "s_suppkey, s_nationkey,
s_acctbal", inputTable, "suppkey_parquet_dict_gzip");
       runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy");
     } finally {
       test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE,
ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR.getDefault().string_val));


Mime
View raw message