From commits-return-6887-archive-asf-public=cust-asf.ponee.io@trafodion.apache.org Fri Feb 16 21:00:37 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2E37418072F for ; Fri, 16 Feb 2018 21:00:36 +0100 (CET) Received: (qmail 31605 invoked by uid 500); 16 Feb 2018 20:00:35 -0000 Mailing-List: contact commits-help@trafodion.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: commits@trafodion.apache.org Delivered-To: mailing list commits@trafodion.apache.org Received: (qmail 31485 invoked by uid 99); 16 Feb 2018 20:00:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Feb 2018 20:00:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7ECD3E2F42; Fri, 16 Feb 2018 20:00:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: selva@apache.org To: commits@trafodion.apache.org Date: Fri, 16 Feb 2018 20:00:37 -0000 Message-Id: <4a1d7de979a449ed955d30ace60afc90@git.apache.org> In-Reply-To: <438f48cfbaf442528b0b555eb4dc8548@git.apache.org> References: <438f48cfbaf442528b0b555eb4dc8548@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/9] trafodion git commit: [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text format hive tables [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text format hive tables Part-3 changes Changes to ensure the multiple chunk and ranges of hdfs scan work fine with refactored code. Pending issues: Statistics needs to be populated. ESP should be assigned the ranges in advance to avoid duplicate scans Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/202a040e Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/202a040e Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/202a040e Branch: refs/heads/master Commit: 202a040ea9eb8ae3ea79329ee14048c4fe2f082c Parents: f17e15e Author: selvaganesang Authored: Fri Feb 2 07:46:48 2018 +0000 Committer: selvaganesang Committed: Fri Feb 2 07:46:48 2018 +0000 ---------------------------------------------------------------------- core/sql/executor/ExHdfsScan.cpp | 54 ++++++++---- core/sql/executor/ExHdfsScan.h | 5 +- .../main/java/org/trafodion/sql/HDFSClient.java | 86 +++++++------------- .../main/java/org/trafodion/sql/HdfsScan.java | 69 +++++++++------- 4 files changed, 113 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/executor/ExHdfsScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp index e29baf6..730f0dc 100644 --- a/core/sql/executor/ExHdfsScan.cpp +++ b/core/sql/executor/ExHdfsScan.cpp @@ -143,7 +143,7 @@ ExHdfsScanTcb::ExHdfsScanTcb( } bufBegin_ = NULL; bufEnd_ = NULL; - logicalBufEnd_ = NULL; + bufLogicalEnd_ = NULL; headRoomCopied_ = 0; prevRangeNum_ = -1; currRangeBytesRead_ = 0; @@ -567,11 +567,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work() } bufBegin_ = NULL; bufEnd_ = NULL; - logicalBufEnd_ = NULL; + bufLogicalEnd_ = NULL; headRoomCopied_ = 0; prevRangeNum_ = -1; currRangeBytesRead_ = 0; recordSkip_ = FALSE; + extraBytesRead_ = 0; step_ = TRAF_HDFS_READ; } break; @@ -589,25 +590,28 @@ ExWorkProcRetcode ExHdfsScanTcb::work() break; } // Assign the starting address of the buffer + hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]); bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]; - if (retArray_[IS_EOF]) - logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]; - else if (retArray_[BYTES_COMPLETED] < hdfsScanBufMaxSize_) - logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - headRoom_; - else - logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]; - hdfo_ = getRange(retArray_[RANGE_NO]); if (retArray_[RANGE_NO] != prevRangeNum_) { + currRangeBytesRead_ = retArray_[BYTES_COMPLETED]; bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_; - if (hdfo_->getStartOffset() == 0) + if (hdfo->getStartOffset() == 0) recordSkip_ = FALSE; else recordSkip_ = TRUE; } else { + currRangeBytesRead_ += retArray_[BYTES_COMPLETED]; bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_; recordSkip_ = FALSE; - } + } + if (currRangeBytesRead_ > hdfo->getBytesToRead()) + extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); + else + extraBytesRead_ = 0; + bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_; + prevRangeNum_ = retArray_[RANGE_NO]; + headRoomCopied_ = 0; if (recordSkip_) { hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_, hdfsScanTdb().recordDelimiter_, @@ -628,6 +632,13 @@ ExWorkProcRetcode ExHdfsScanTcb::work() break; case COPY_TAIL_TO_HEAD: { + BYTE *headRoomStartAddr; + headRoomCopied_ = bufEnd_ - (BYTE *)hdfsBufNextRow_; + if (retArray_[BUF_NO] == 0) + headRoomStartAddr = hdfsScanBuf_[1].buf_ - headRoomCopied_; + else + headRoomStartAddr = hdfsScanBuf_[0].buf_ - headRoomCopied_; + memcpy(headRoomStartAddr, hdfsBufNextRow_, headRoomCopied_); step_ = TRAF_HDFS_READ; } break; @@ -1023,6 +1034,10 @@ ExWorkProcRetcode ExHdfsScanTcb::work() case PROCESS_HDFS_ROW: { + if (!useLibhdfsScan_ && hdfsBufNextRow_ == NULL) { + step_ = TRAF_HDFS_READ; + break; + } exception_ = FALSE; nextStep_ = NOT_STARTED; debugPenultimatePrevRow_ = debugPrevRow_; @@ -1066,9 +1081,20 @@ ExWorkProcRetcode ExHdfsScanTcb::work() } else { - numBytesProcessedInRange_ += + if (useLibhdfsScan_) { + numBytesProcessedInRange_ += startOfNextRow - hdfsBufNextRow_; - hdfsBufNextRow_ = startOfNextRow; + hdfsBufNextRow_ = startOfNextRow; + } + else { + if ((BYTE *)startOfNextRow >= bufLogicalEnd_) { + step_ = TRAF_HDFS_READ; + hdfsBufNextRow_ = NULL; + } + else + hdfsBufNextRow_ = startOfNextRow; + } + } if (exception_) @@ -1691,7 +1717,7 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err, } else { sourceDataEnd = (const char *)bufEnd_; - endOfRequestedRange = (const char *)logicalBufEnd_; + endOfRequestedRange = NULL; } hdfsLoggingRow_ = hdfsBufNextRow_; if (asciiSourceTD->numAttrs() == 0) http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/executor/ExHdfsScan.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h index 62bb11e..2570a58 100644 --- a/core/sql/executor/ExHdfsScan.h +++ b/core/sql/executor/ExHdfsScan.h @@ -119,7 +119,7 @@ private: a) filename b) offset c) len - Java layer always reads more than the len by rangeTailIOSize_ to accomdate the record split + Java layer always reads more than the len by rangeTailIOSize_ to accommdate the record split 2. Two ByteBuffer objects are also passsed to HdfsScan object. These ByteBuffers are backed up by 2 native buffers where the data is fetched. The buffer has a head room of size rangeTailIOSize_ and the data is always read after the head room. @@ -347,11 +347,12 @@ protected: int retArray_[4]; BYTE *bufBegin_; BYTE *bufEnd_; - BYTE *logicalBufEnd_; + BYTE *bufLogicalEnd_; long currRangeBytesRead_; int headRoomCopied_; int headRoom_; int prevRangeNum_; + int extraBytesRead_; NABoolean recordSkip_; }; http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java index 1af2c49..3b83c8f 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java @@ -87,31 +87,45 @@ public class HDFSClient class HDFSRead implements Callable { - int length_; - - HDFSRead(int length) + HDFSRead() { - length_ = length; } public Object call() throws IOException { int bytesRead; - if (buf_.hasArray()) - bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, length_); - else + int totalBytesRead = 0; + if (! buf_.hasArray()) + fsdis_.seek(pos_); + do { - buf_.limit(bufOffset_ + length_); - bytesRead = fsdis_.read(buf_); - } - return new Integer(bytesRead); + if (buf_.hasArray()) + bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_); + else + bytesRead = fsdis_.read(buf_); + if (bytesRead == -1) { + isEOF_ = 1; + break; + } + if (bytesRead == 0) + break; + totalBytesRead += bytesRead; + if (totalBytesRead == bufLen_) + break; + bufOffset_ += bytesRead; + pos_ += bytesRead; + lenRemain_ -= bytesRead; + } while (lenRemain_ > 0); + return new Integer(totalBytesRead); } } public HDFSClient() { } - + + // This constructor enables the hdfs data to be read in another thread while the previously + // read buffer is being processed by the SQL engine public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException { bufNo_ = bufNo; @@ -127,44 +141,24 @@ public class HDFSClient len_ = length; if (buffer.hasArray()) bufLen_ = buffer.array().length; - else - { + else { bufLen_ = buffer.capacity(); buf_.position(0); } lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_; - if (lenRemain_ != 0) - { - int readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_; - future_ = executorService_.submit(new HDFSRead(readLength)); + if (lenRemain_ != 0) { + future_ = executorService_.submit(new HDFSRead()); } } - public int trafHdfsRead() throws IOException, InterruptedException, ExecutionException + public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException { Integer retObject = 0; int bytesRead; - int readLength; - - if (lenRemain_ == 0) - return 0; retObject = (Integer)future_.get(); bytesRead = retObject.intValue(); - if (bytesRead == -1) - return -1; - bufOffset_ += bytesRead; - pos_ += bytesRead; - lenRemain_ -= bytesRead; - if (bufOffset_ == bufLen_) - return bytesRead; - else if (bufOffset_ > bufLen_) - throw new IOException("Internal Error in trafHdfsRead "); - if (lenRemain_ == 0) - return bytesRead; - readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_; - future_ = executorService_.submit(new HDFSRead(readLength)); return bytesRead; - } + } public int getRangeNo() { @@ -176,24 +170,6 @@ public class HDFSClient return isEOF_; } - public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException - { - int bytesRead; - int totalBytesRead = 0; - while (true) { - bytesRead = trafHdfsRead(); - if (bytesRead == -1) { - isEOF_ = 1; - return totalBytesRead; - } - if (bytesRead == 0) - return totalBytesRead; - totalBytesRead += bytesRead; - if (totalBytesRead == bufLen_) - return totalBytesRead; - } - } - boolean hdfsCreate(String fname , boolean compress) throws IOException { if (logger_.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java index 9fb145e..73ceda8 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java +++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java @@ -65,7 +65,6 @@ public class HdfsScan private long lenRemain_; private int lastBufCompleted_ = -1; private boolean scanCompleted_; - private boolean lastScanRangeScheduled_; class HdfsScanRange { @@ -113,41 +112,44 @@ public class HdfsScan } if (hdfsScanRanges_.length > 0) { currRange_ = 0; - currPos_ = hdfsScanRanges_[0].pos_; - lenRemain_ = hdfsScanRanges_[0].len_; - hdfsScanRange(0); + currPos_ = hdfsScanRanges_[currRange_].pos_; + lenRemain_ = hdfsScanRanges_[currRange_].len_; + hdfsScanRange(0, 0); } scanCompleted_ = false; - lastScanRangeScheduled_ = false; } - public void hdfsScanRange(int bufNo) throws IOException + public void hdfsScanRange(int bufNo, int bytesCompleted) throws IOException { - if (logger_.isDebugEnabled()) - logger_.debug(" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo); + lenRemain_ -= bytesCompleted; + currPos_ += bytesCompleted; int readLength; - if (lenRemain_ > bufLen_[bufNo]) - readLength = bufLen_[bufNo]; - else - readLength = (int)lenRemain_; - hdfsClient_[bufNo] = new HDFSClient(bufNo, currRange_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength); - lenRemain_ -= readLength; - currPos_ += readLength; - if (lenRemain_ == 0) { - if (currRange_ == (hdfsScanRanges_.length-1)) - lastScanRangeScheduled_ = true; + if (lenRemain_ <= 0) { + if (currRange_ == (hdfsScanRanges_.length-1)) { + scanCompleted_ = true; + return; + } else { currRange_++; currPos_ = hdfsScanRanges_[currRange_].pos_; lenRemain_ = hdfsScanRanges_[currRange_].len_; } } + if (lenRemain_ > bufLen_[bufNo]) + readLength = bufLen_[bufNo]; + else + readLength = (int)lenRemain_; + if (! scanCompleted_) { + if (logger_.isDebugEnabled()) + logger_.debug(" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo); + hdfsClient_[bufNo] = new HDFSClient(bufNo, currRange_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength); + } } public int[] trafHdfsRead() throws IOException, InterruptedException, ExecutionException { int[] retArray; - int byteCompleted; + int bytesRead; int bufNo; int rangeNo; int isEOF; @@ -160,44 +162,41 @@ public class HdfsScan switch (lastBufCompleted_) { case -1: case 1: - byteCompleted = hdfsClient_[0].trafHdfsReadBuffer(); + bytesRead = hdfsClient_[0].trafHdfsReadBuffer(); bufNo = 0; rangeNo = hdfsClient_[0].getRangeNo(); isEOF = hdfsClient_[0].isEOF(); break; case 0: - byteCompleted = hdfsClient_[1].trafHdfsReadBuffer(); + bytesRead = hdfsClient_[1].trafHdfsReadBuffer(); bufNo = 1; rangeNo = hdfsClient_[1].getRangeNo(); isEOF = hdfsClient_[1].isEOF(); break; default: bufNo = -1; - byteCompleted = -1; + bytesRead = -1; rangeNo = -1; isEOF = 0; } - lastBufCompleted_ = bufNo; - retArray[0] = byteCompleted; + retArray[0] = bytesRead; retArray[1] = bufNo; retArray[2] = rangeNo; retArray[3] = isEOF; if (logger_.isDebugEnabled()) logger_.debug(" Range No " + retArray[2] + " Buffer No " + retArray[1] + " Bytes Read " + retArray[0] + " isEOF " + retArray[3]); lastBufCompleted_ = bufNo; - if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1))) - lastScanRangeScheduled_ = true; - if (lastScanRangeScheduled_) { + if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1))) { scanCompleted_ = true; - return retArray; + return retArray; } switch (lastBufCompleted_) { case 0: - hdfsScanRange(1); + hdfsScanRange(1, bytesRead); break; case 1: - hdfsScanRange(0); + hdfsScanRange(0, bytesRead); break; default: break; @@ -257,10 +256,20 @@ public class HdfsScan hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len); int[] retArray; int bytesCompleted; + ByteBuffer buf; while (true) { retArray = hdfsScan.trafHdfsRead(); if (retArray == null) break; + System.out.println("Range No:" + retArray[2] + " Buf No:" + retArray[1] + " Bytes Completed:" + retArray[0] + " EOF:" + retArray[3]); + if (retArray[1] == 0) + buf = buf1; + else + buf = buf2; + buf.position(0); + for (int i = 0; i < 50; i++) + System.out.print(buf.get()); + System.out.println(""); } long time2 = System.currentTimeMillis(); HdfsScan.shutdown();