trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/9] trafodion git commit: [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text format hive tables
Date Fri, 16 Feb 2018 20:00:38 GMT
[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text format hive tables

Part-4 changes

Hive scan with ESPs proceses the correct amount of rows now
Hdfs Scan Statistics are populated

Pending changes:
Hdfs Scan with compression needs to be done.


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/7066e3e4
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/7066e3e4
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/7066e3e4

Branch: refs/heads/master
Commit: 7066e3e478d0de7c1af5dfea298e4d6a10fb9596
Parents: 202a040
Author: selvaganesang <selva.govindarajan@esgyn.com>
Authored: Tue Feb 6 04:48:01 2018 +0000
Committer: selvaganesang <selva.govindarajan@esgyn.com>
Committed: Tue Feb 6 04:48:01 2018 +0000

----------------------------------------------------------------------
 core/sql/executor/ExHdfsScan.cpp                | 82 ++++++++++----------
 core/sql/executor/HdfsClient_JNI.cpp            | 59 ++++++++++----
 core/sql/executor/HdfsClient_JNI.h              |  8 +-
 .../main/java/org/trafodion/sql/HdfsScan.java   | 16 ++--
 4 files changed, 103 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/7066e3e4/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index 730f0dc..f8ec9a1 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -121,6 +121,7 @@ ExHdfsScanTcb::ExHdfsScanTcb(
   , loggingFileName_(NULL)
   , hdfsClient_(NULL)
   , hdfsScan_(NULL)
+  , hdfsStats_(NULL)
   , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), hdfsScanTdb.getHdfsFileInfoList()->numEntries())
 {
   Space * space = (glob ? glob->getSpace() : 0);
@@ -333,12 +334,12 @@ ExOperStats * ExHdfsScanTcb::doAllocateStatsEntry(CollHeap *heap,
   else
      ss = getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->getStatement()->getStmtStats();

   
-  ExHdfsScanStats *hdfsScanStats = new(heap) ExHdfsScanStats(heap,
+  hdfsStats_ = new(heap) ExHdfsScanStats(heap,
 				   this,
 				   tdb);
   if (ss != NULL) 
-     hdfsScanStats->setQueryId(ss->getQueryId(), ss->getQueryIdLen());
-  return hdfsScanStats;
+     hdfsStats_->setQueryId(ss->getQueryId(), ss->getQueryIdLen());
+  return hdfsStats_;
 }
 
 void ExHdfsScanTcb::registerSubtasks()
@@ -419,12 +420,6 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 	  {
 	    matches_ = 0;
 	    
-	    hdfsStats_ = NULL;
-	    if (getStatsEntry())
-	      hdfsStats_ = getStatsEntry()->castToExHdfsScanStats();
-
-	    ex_assert(hdfsStats_, "hdfs stats cannot be null");
-
 	    beginRangeNum_ = -1;
 	    numRanges_ = -1;
 	    hdfsOffset_ = 0;
@@ -557,7 +552,8 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
              if (hdfsScan_ != NULL)
                 NADELETE(hdfsScan_, HdfsScan, getHeap());
              hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_,

-                            &hdfsFileInfoListAsArray_, hdfsScanTdb().rangeTailIOSize_,
hdfsScanRetCode);
+                            &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_,

+                            hdfsStats_, hdfsScanRetCode);
              if (hdfsScanRetCode != HDFS_SCAN_OK)
              {
                 setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", 
@@ -578,7 +574,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
           break;
         case TRAF_HDFS_READ:
           {
-             hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap *)getHeap(), retArray_,
sizeof(retArray_)/sizeof(int));
+             hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap *)getHeap(), hdfsStats_,
retArray_, sizeof(retArray_)/sizeof(int));
              if (hdfsScanRetCode == HDFS_SCAN_EOR) {
                 step_ = DONE;
                 break;
@@ -589,7 +585,6 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 step_ = HANDLE_ERROR_AND_DONE;
                 break;
              } 
-             // Assign the starting address of the buffer
              hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
              bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
              if (retArray_[RANGE_NO] != prevRangeNum_) {  
@@ -608,8 +603,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); 
              else
                 extraBytesRead_ = 0;
+             // headRoom_ is the number of extra bytes read (rangeTailIOSize)
+             // If EOF is reached while reading the range and the extraBytes read
+             // is less than headRoom_, then process all the data till EOF 
+             if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
+                extraBytesRead_ = 0;
              bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED]
- extraBytesRead_;
- 
              prevRangeNum_ = retArray_[RANGE_NO];
              headRoomCopied_ = 0;
              if (recordSkip_) {
@@ -633,7 +632,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
         case COPY_TAIL_TO_HEAD:
           {
              BYTE *headRoomStartAddr;
-             headRoomCopied_ = bufEnd_ - (BYTE *)hdfsBufNextRow_;
+             headRoomCopied_ = bufLogicalEnd_ - (BYTE *)hdfsBufNextRow_;
              if (retArray_[BUF_NO] == 0)
                 headRoomStartAddr = hdfsScanBuf_[1].buf_ - headRoomCopied_;
              else
@@ -1087,9 +1086,11 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 	       hdfsBufNextRow_ = startOfNextRow;
              } 
              else {
-                if ((BYTE *)startOfNextRow >= bufLogicalEnd_) {
+                if ((BYTE *)startOfNextRow > bufLogicalEnd_) {
                    step_ = TRAF_HDFS_READ;
                    hdfsBufNextRow_ = NULL;
+	           if (!exception_)
+	               break;
                 }
                 else
 	          hdfsBufNextRow_ = startOfNextRow;
@@ -1223,8 +1224,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 if (hdfsScanTdb().continueOnError())
                 {
                   if ((pentry_down->downState.request == ex_queue::GET_N) &&
-                      (pentry_down->downState.requestValue == matches_))
-                    step_ = CLOSE_HDFS_CURSOR;
+                      (pentry_down->downState.requestValue == matches_)) {
+                     if (useLibhdfsScan_)
+                        step_ = CLOSE_HDFS_CURSOR;
+                     else
+                        step_ = DONE;
+                  }
                   else
                     step_ = PROCESS_HDFS_ROW;
 
@@ -1290,8 +1295,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 	        if (hdfsScanTdb().continueOnError())
 	        {
 	          if ((pentry_down->downState.request == ex_queue::GET_N) &&
-	              (pentry_down->downState.requestValue == matches_))
-	            step_ = CLOSE_FILE;
+	              (pentry_down->downState.requestValue == matches_)) {
+                     if (useLibhdfsScan_)
+                        step_ = CLOSE_HDFS_CURSOR;
+                     else
+                        step_ = DONE;
+                  }
 	          else
 	            step_ = PROCESS_HDFS_ROW;
 
@@ -1332,7 +1341,10 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 	          pentry_down->getDiagsArea()->
 	              mergeAfter(*up_entry->getDiagsArea());
 	          up_entry->setDiagsArea(NULL);
-	          step_ = HANDLE_ERROR_WITH_CLOSE;
+                  if (useLibhdfsScan_)
+	             step_ = HANDLE_ERROR_WITH_CLOSE;
+	          else
+	             step_ = HANDLE_ERROR;
 	          break;
 	        }
 	      }
@@ -1551,15 +1563,17 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 	      }
 	    up_entry->upState.status = ex_queue::Q_SQLERROR;
 	    qparent_.up->insert();
-	    
-            if (step_ == HANDLE_ERROR_WITH_CLOSE)
-               step_ = CLOSE_HDFS_CURSOR;
-            else if (step_ == HANDLE_ERROR_AND_DONE)
-              step_ = DONE;
-            else
-	       step_ = ERROR_CLOSE_FILE;
+          
+            if (useLibhdfsScan_) {
+               if (step_ == HANDLE_ERROR_WITH_CLOSE)
+                  step_ = CLOSE_HDFS_CURSOR;
+               else if (step_ == HANDLE_ERROR_AND_DONE)
+                  step_ = DONE;
+               else
+	          step_ = ERROR_CLOSE_FILE;
+            } else
+               step_ = DONE;
 	    break;
-           
 	  }
 
 	case CLOSE_FILE:
@@ -2280,12 +2294,6 @@ ExWorkProcRetcode ExOrcScanTcb::work()
 	  {
 	    matches_ = 0;
 	    
-	    hdfsStats_ = NULL;
-	    if (getStatsEntry())
-	      hdfsStats_ = getStatsEntry()->castToExHdfsScanStats();
-
-	    ex_assert(hdfsStats_, "hdfs stats cannot be null");
-
 	    beginRangeNum_ = -1;
 	    numRanges_ = -1;
 
@@ -2629,12 +2637,6 @@ ExWorkProcRetcode ExOrcFastAggrTcb::work()
 	  {
 	    matches_ = 0;
 
-	    hdfsStats_ = NULL;
-	    if (getStatsEntry())
-	      hdfsStats_ = getStatsEntry()->castToExHdfsScanStats();
-            
-	    ex_assert(hdfsStats_, "hdfs stats cannot be null");
-
             orcAggrTdb().getHdfsFileInfoList()->position();
 
             rowCount_ = 0;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/7066e3e4/core/sql/executor/HdfsClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp
index 63c4ac1..8f2845a 100644
--- a/core/sql/executor/HdfsClient_JNI.cpp
+++ b/core/sql/executor/HdfsClient_JNI.cpp
@@ -68,7 +68,7 @@ HDFS_Scan_RetCode HdfsScan::init()
     JavaMethods_[JM_CTOR      ].jm_name      = "<init>";
     JavaMethods_[JM_CTOR      ].jm_signature = "()V";
     JavaMethods_[JM_SET_SCAN_RANGES].jm_name      = "setScanRanges";
-    JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/lang/String;[J[J)V";
+    JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/lang/String;[J[J[I)V";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_name      = "trafHdfsRead";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I";
    
@@ -90,7 +90,8 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum)
 
 /////////////////////////////////////////////////////////////////////////////
 HDFS_Scan_RetCode HdfsScan::setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,
 int scanBufSize,
-      HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize)
+      HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize,
+      ExHdfsScanStats *hdfsStats)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called.");
 
@@ -113,12 +114,16 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN
    jobjectArray j_filenames = NULL;
    jlongArray j_offsets = NULL;
    jlongArray j_lens = NULL;  
+   jintArray j_rangenums = NULL;
    HdfsFileInfo *hdfo;
    jstring j_obj;
 
    HDFS_Scan_RetCode hdfsScanRetCode =  HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
    int arrayLen = hdfsFileInfoArray->entries();
-   for (int i = 0; i < arrayLen; i++) {
+   
+   for (Int32 i = beginRangeNum, rangeCount=0; i < arrayLen; i++, rangeCount++) {
+       if (rangeCount >= numRanges)
+           break;
        hdfo = hdfsFileInfoArray->at(i);
        j_obj = jenv_->NewStringUTF(hdfo->fileName());
        if (jenv_->ExceptionCheck()) {
@@ -126,35 +131,54 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN
           return hdfsScanRetCode;
        }
        if (j_filenames == NULL) {
-          j_filenames = jenv_->NewObjectArray(arrayLen, jenv_->GetObjectClass(j_obj),
NULL);
+          j_filenames = jenv_->NewObjectArray(numRanges, jenv_->GetObjectClass(j_obj),
NULL);
           if (jenv_->ExceptionCheck()) {
              jenv_->PopLocalFrame(NULL);
              return hdfsScanRetCode;
           }
        }
-       jenv_->SetObjectArrayElement(j_filenames, i, (jobject)j_obj);
+       jenv_->SetObjectArrayElement(j_filenames, rangeCount, (jobject)j_obj);
        jenv_->DeleteLocalRef(j_obj);
+
        if (j_offsets == NULL) {
-          j_offsets = jenv_->NewLongArray(arrayLen);
+          j_offsets = jenv_->NewLongArray(numRanges);
           if (jenv_->ExceptionCheck()) {
              jenv_->PopLocalFrame(NULL);
              return hdfsScanRetCode;
           }
        }
        long offset = hdfo->getStartOffset(); 
-       jenv_->SetLongArrayRegion(j_offsets, i, 1, &offset);
+       jenv_->SetLongArrayRegion(j_offsets, rangeCount, 1, &offset);
+
        if (j_lens == NULL) {
-          j_lens = jenv_->NewLongArray(arrayLen);
+          j_lens = jenv_->NewLongArray(numRanges);
           if (jenv_->ExceptionCheck()) {
              jenv_->PopLocalFrame(NULL);
              return hdfsScanRetCode;
           }
        }
        long len = hdfo->getBytesToRead()+rangeTailIOSize;
-       jenv_->SetLongArrayRegion(j_lens, i, 1, &len);
+       jenv_->SetLongArrayRegion(j_lens, rangeCount, 1, &len);
+
+       if (j_rangenums == NULL) {
+          j_rangenums = jenv_->NewIntArray(numRanges);
+          if (jenv_->ExceptionCheck()) {
+             jenv_->PopLocalFrame(NULL);
+             return hdfsScanRetCode;
+          }
+       }
+       jint tdbRangeNum = i;
+       jenv_->SetIntArrayRegion(j_rangenums, rangeCount, 1, &tdbRangeNum);
    } 
 
-   jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1,
j_buf2, j_filenames, j_offsets, j_lens);
+   if (hdfsStats)
+       hdfsStats->getHdfsTimer().start();
+   tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name;
+   jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1,
j_buf2, j_filenames, j_offsets, j_lens, j_rangenums);
+   if (hdfsStats) {
+      hdfsStats->incMaxHdfsIOTime(hdfsStats->getHdfsTimer().stop());
+      hdfsStats->incHdfsCalls();
+   }
 
    if (jenv_->ExceptionCheck()) {
       getExceptionDetails();
@@ -167,7 +191,8 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN
 }
 
 HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,
 int scanBufSize,
-      HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize, HDFS_Scan_RetCode &hdfsScanRetCode)
+      HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize,

+      ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called.");
 
@@ -179,7 +204,7 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfs
        hdfsScanRetCode = hdfsScan->init();
        if (hdfsScanRetCode == HDFS_SCAN_OK) 
           hdfsScanRetCode = hdfsScan->setScanRanges(heap, hdfsScanBuf, scanBufSize, 
-                    hdfsFileInfoArray, rangeTailIOSize); 
+                    hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize, hdfsStats);

        if (hdfsScanRetCode != HDFS_SCAN_OK) {
           NADELETE(hdfsScan, HdfsScan, heap);
           hdfsScan = NULL;
@@ -189,14 +214,22 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfs
 }
 
 
-HDFS_Scan_RetCode HdfsScan::trafHdfsRead(NAHeap *heap, int retArray[], short arrayLen)
+HDFS_Scan_RetCode HdfsScan::trafHdfsRead(NAHeap *heap, ExHdfsScanStats *hdfsStats, int retArray[],
short arrayLen)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::trafHdfsRead() called.");
 
    if (initJNIEnv() != JOI_OK)
      return HDFS_SCAN_ERROR_TRAF_HDFS_READ_PARAM;
 
+   if (hdfsStats)
+       hdfsStats->getHdfsTimer().start();
+   tsRecentJMFromJNI = JavaMethods_[JM_TRAF_HDFS_READ].jm_full_name;
    jintArray j_retArray = (jintArray)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_TRAF_HDFS_READ].methodID);
+   if (hdfsStats) {
+      hdfsStats->incMaxHdfsIOTime(hdfsStats->getHdfsTimer().stop());
+      hdfsStats->incHdfsCalls();
+   }
+
    if (jenv_->ExceptionCheck()) {
       getExceptionDetails();
       logError(CAT_SQL_HDFS, __FILE__, __LINE__);

http://git-wip-us.apache.org/repos/asf/trafodion/blob/7066e3e4/core/sql/executor/HdfsClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h
index 0426ebc..5854d59 100644
--- a/core/sql/executor/HdfsClient_JNI.h
+++ b/core/sql/executor/HdfsClient_JNI.h
@@ -58,12 +58,14 @@ public:
   static char* getErrorText(HDFS_Scan_RetCode errEnum);
 
   static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int
scanBufSize, 
-            HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize, HDFS_Scan_RetCode
&hdfsScanRetCode);
+            HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int
rangeTailIOSize,
+            ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode);
 
   HDFS_Scan_RetCode setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,
int scanBufSize, 
-            HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize);
+            HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, 
+            int rangeTailIOSize, ExHdfsScanStats *hdfsStats);
 
-  HDFS_Scan_RetCode trafHdfsRead(NAHeap *heap, int retArray[], short arrayLen);
+  HDFS_Scan_RetCode trafHdfsRead(NAHeap *heap, ExHdfsScanStats *hdfsStats, int retArray[],
short arrayLen);
 
 private:
   enum JAVA_METHODS {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/7066e3e4/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index 73ceda8..f3d505d 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -71,12 +71,14 @@ public class HdfsScan
       String filename_;
       long pos_;
       long len_;
+      int tdbRangeNum_;
       
-      HdfsScanRange(String filename, long pos, long len)
+      HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
       {
          filename_ = filename;
          pos_ = pos;
          len_ = len;
+         tdbRangeNum_ = tdbRangeNum;
       }
    }
    
@@ -91,7 +93,7 @@ public class HdfsScan
    {
    }
 
-   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[],
long len[]) throws IOException
+   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[],
long len[], int rangeNum[]) throws IOException
    {
       buf_ = new ByteBuffer[2];
       bufLen_ = new int[2];
@@ -108,7 +110,7 @@ public class HdfsScan
       hdfsClient_ = new HDFSClient[2];
       hdfsScanRanges_ = new HdfsScanRange[filename.length]; 
       for (int i = 0; i < filename.length; i++) {
-         hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i]);
+         hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i]);
       }
       if (hdfsScanRanges_.length > 0) {
          currRange_ = 0;
@@ -141,8 +143,8 @@ public class HdfsScan
          readLength = (int)lenRemain_;
       if (! scanCompleted_) {
          if (logger_.isDebugEnabled())
-            logger_.debug(" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + "
BufNo " + bufNo); 
-         hdfsClient_[bufNo] = new HDFSClient(bufNo, currRange_, hdfsScanRanges_[currRange_].filename_,
buf_[bufNo], currPos_, readLength);
+            logger_.debug(" CurrentRange " + hdfsScanRanges_[currRange_].tdbRangeNum_ + "
LenRemain " + lenRemain_ + " BufNo " + bufNo); 
+         hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].tdbRangeNum_,
hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength);
       }
    } 
    
@@ -233,6 +235,7 @@ public class HdfsScan
       String fileName[] = new String[file_status.length * split];
       long pos[] = new long[file_status.length * split];
       long len[] = new long[file_status.length * split];
+      int range[] = new int[file_status.length * split];
       for (int i = 0 ; i < file_status.length * split; i++) {
          Path filePath = file_status[i].getPath();
          long fileLen = file_status[i].getLen(); 
@@ -245,6 +248,7 @@ public class HdfsScan
             fileName[i] = filePath.toString();
             pos[i] = splitPos + (splitLen * j);
             len[i] = splitLen;
+            range[i] = i;
             if (j == (split-1))
                len[i] = fileLen - (splitLen *(j));
             System.out.println ("Range " + i + " Pos " + pos[i] + " Length " + len[i]); 
@@ -253,7 +257,7 @@ public class HdfsScan
       }
       long time1 = System.currentTimeMillis();
       HdfsScan hdfsScan = new HdfsScan();
-      hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len);
+      hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len, range);
       int[] retArray;
       int bytesCompleted;
       ByteBuffer buf;


Mime
View raw message