drill-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sudheeshkatkam <...@git.apache.org>
Subject [GitHub] drill pull request #723: Improve Parquet Scan pipelining.
Date Thu, 26 Jan 2017 00:08:00 GMT
Github user sudheeshkatkam commented on a diff in the pull request:

    https://github.com/apache/drill/pull/723#discussion_r97904187
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
---
    @@ -192,45 +235,74 @@ private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData)
{
             stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
           }
           pageHeader = readStatus.getPageHeader();
    -      // reset this. At the time of calling close, if this is not null then a pending
asyncPageRead needs to be consumed
    -      asyncPageRead = null;
    -    } catch (Exception e) {
    -      handleAndThrowException(e, "Error reading page data.");
    -    }
     
         // TODO - figure out if we need multiple dictionary pages, I believe it may be limited
to one
         // I think we are clobbering parts of the dictionary if there can be multiple pages
of dictionary
     
    -    do {
    -      if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
    -        readDictionaryPageData(readStatus, parentColumnReader);
    -        // Ugly. Use the Async task to make a synchronous read call.
    -        readStatus = new AsyncPageReaderTask().call();
    -        pageHeader = readStatus.getPageHeader();
    -      }
    -    } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
    -
    -    if (parentColumnReader.totalValuesRead + readStatus.getValuesRead()
    -        < parentColumnReader.columnChunkMetaData.getValueCount()) {
    -      asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
    -    }
    +      do {
    +        if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
    +          readDictionaryPageData(readStatus, parentColumnReader);
    +          asyncPageRead.poll().get(); // get the result of execution
    +          synchronized (pageQueue) {
    +            boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
    +            readStatus = pageQueue.take(); // get the data if no exception has been thrown
    +            if (readStatus.pageData == null || readStatus == ReadStatus.EMPTY) {
    +              break;
    +            }
    +            //if the queue was full before we took a page out, then there would
    +            // have been no new read tasks scheduled. In that case, schedule a new read.
    +            if (pageQueueFull) {
    +              asyncPageRead.offer(threadPool.submit(new AsyncPageReaderTask(debugName,
pageQueue)));
    +            }
    +          }
    +          assert (readStatus.pageData != null);
    +          pageHeader = readStatus.getPageHeader();
    +        }
    +      } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
     
         pageHeader = readStatus.getPageHeader();
         pageData = getDecompressedPageData(readStatus);
    -
    +    assert(pageData != null);
    +    } catch (InterruptedException e) {
    +      Thread.currentThread().interrupt();
    +    } catch (Exception e){
    +      handleAndThrowException(e, "Error reading page data");
    +    }
     
       }
     
    -
       @Override public void clear() {
    -    if (asyncPageRead != null) {
    +    while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
           try {
    -        final ReadStatus readStatus = asyncPageRead.get();
    -        readStatus.getPageData().release();
    +        Future<Boolean> f = asyncPageRead.poll();
    +        if(!f.isDone() && !f.isCancelled()){
    +          f.cancel(true);
    +        } else {
    +          Boolean b = f.get(1, TimeUnit.MILLISECONDS);
    --- End diff --
    
    remove assignment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message