trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/4] trafodion git commit: [TRAFODION-3065] Trafodion to support compressed Hive Text formatted tables
Date Thu, 10 May 2018 21:08:21 GMT
[TRAFODION-3065] Trafodion to support compressed Hive Text formatted tables

Compressed text files are now supported via the new implementation using
HDFS java APIs. When the hadoop is not configured to support a particular type
of compression, an error is thrown.

[TRAFODION-2982] JNI HDFS interface should support varied sized large buffers for read/write
A new CQD HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB is introduced to chunk
the read and write when byteArray is involved.


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/96cab4dd
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/96cab4dd
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/96cab4dd

Branch: refs/heads/master
Commit: 96cab4ddd086a59ebc0eab8ac4a93ee3cf315aac
Parents: f216cdb
Author: selvaganesang <selva.govindarajan@esgyn.com>
Authored: Wed May 9 00:36:04 2018 +0000
Committer: selvaganesang <selva.govindarajan@esgyn.com>
Committed: Wed May 9 18:51:31 2018 +0000

----------------------------------------------------------------------
 core/sql/comexe/ComTdbFastTransport.cpp         |   1 +
 core/sql/comexe/ComTdbFastTransport.h           |   7 +-
 core/sql/comexe/ComTdbHdfsScan.cpp              |   1 +
 core/sql/comexe/ComTdbHdfsScan.h                |   5 +
 core/sql/executor/ExHdfsScan.cpp                |  16 ++-
 core/sql/executor/HdfsClient_JNI.cpp            | 126 +++++++++++------
 core/sql/executor/HdfsClient_JNI.h              |   9 +-
 .../sql/executor/org_trafodion_sql_HDFSClient.h |  31 ++++
 core/sql/exp/ExpLOBinterface.h                  |  10 ++
 core/sql/generator/GenFastTransport.cpp         |   4 +
 core/sql/generator/GenRelScan.cpp               |   5 +-
 core/sql/regress/hive/DIFF002.KNOWN             |  14 ++
 core/sql/regress/hive/FILTER002                 |  33 +++++
 core/sql/sqlcomp/DefaultConstants.h             |   5 +
 core/sql/sqlcomp/nadefaults.cpp                 |   1 +
 .../main/java/org/trafodion/sql/HDFSClient.java | 140 +++++++++++++++----
 .../main/java/org/trafodion/sql/HdfsScan.java   |  34 ++++-
 17 files changed, 354 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbFastTransport.cpp
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbFastTransport.cpp b/core/sql/comexe/ComTdbFastTransport.cpp
index 49d830e..90f635f 100644
--- a/core/sql/comexe/ComTdbFastTransport.cpp
+++ b/core/sql/comexe/ComTdbFastTransport.cpp
@@ -99,6 +99,7 @@ ComTdbFastExtract::ComTdbFastExtract(
   hdfsReplication_(replication),
   ioTimeout_(ioTimeout),
   childDataRowLen_(childDataRowLen),
+  hdfsIoByteArraySize_(0),
   modTSforDir_(-1)
 {
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbFastTransport.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbFastTransport.h b/core/sql/comexe/ComTdbFastTransport.h
index 37c01da..0666953 100644
--- a/core/sql/comexe/ComTdbFastTransport.h
+++ b/core/sql/comexe/ComTdbFastTransport.h
@@ -369,6 +369,10 @@ public:
   void setModTSforDir(Int64 v) { modTSforDir_ = v; }
   Int64 getModTSforDir() const { return modTSforDir_; }
 
+  void setHdfsIoByteArraySize(int size)             
+    { hdfsIoByteArraySize_ = size; }                          
+  UInt16 getHdfsIoByteArraySize()                             
+    { return hdfsIoByteArraySize_; }
 protected:
   NABasicPtr   targetName_;                                  // 00 - 07
   NABasicPtr   delimiter_;                                   // 08 - 15
@@ -395,9 +399,10 @@ protected:
   UInt16       filler_;                                      // 130 - 131
   UInt32       childDataRowLen_;                             // 132 - 135
   Int64        modTSforDir_;                                 // 136 - 143
+  UInt16       hdfsIoByteArraySize_;                         // 144 - 147 
 
   // Make sure class size is a multiple of 8
-  char fillerComTdbFastTransport_[8];                        // 144 - 151
+  char fillerComTdbFastTransport_[4];                        // 148 - 151
 
 };
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHdfsScan.cpp b/core/sql/comexe/ComTdbHdfsScan.cpp
index a0bf5c1..f5e2907 100755
--- a/core/sql/comexe/ComTdbHdfsScan.cpp
+++ b/core/sql/comexe/ComTdbHdfsScan.cpp
@@ -121,6 +121,7 @@ ComTdbHdfsScan::ComTdbHdfsScan(
   hdfsRootDir_(hdfsRootDir),
   modTSforDir_(modTSforDir),
   numOfPartCols_(numOfPartCols),
+  hdfsIoByteArraySize_(0),
   hdfsDirsToCheck_(hdfsDirsToCheck)
 {};
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h
index ea995fb..f9a0afd 100755
--- a/core/sql/comexe/ComTdbHdfsScan.h
+++ b/core/sql/comexe/ComTdbHdfsScan.h
@@ -136,6 +136,7 @@ class ComTdbHdfsScan : public ComTdb
   UInt16 origTuppIndex_;                                      // 188 - 189
   char fillersComTdbHdfsScan1_[2];                            // 190 - 191
   NABasicPtr nullFormat_;                                     // 192 - 199
+  UInt16 hdfsIoByteArraySize_;                                // 198 - 199
 
   // next 4 params are used to check if data under hdfsFileDir
   // was modified after query was compiled.
@@ -362,6 +363,10 @@ public:
   Queue * hdfsDirsToCheck() { return hdfsDirsToCheck_; }
  
   char *hdfsRootDir() { return hdfsRootDir_; }
+  void setHdfsIoByteArraySize(int size)
+    { hdfsIoByteArraySize_ = size; }
+  UInt16 getHdfsIoByteArraySize() 
+    { return hdfsIoByteArraySize_; }
 };
 
 inline ComTdb * ComTdbHdfsScan::getChildTdb()

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index 97697f3..e5d73dc 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -129,7 +129,7 @@ ExHdfsScanTcb::ExHdfsScanTcb(
   Space * space = (glob ? glob->getSpace() : 0);
   CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
   useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
-  if (isSequenceFile() || hdfsScanTdb.isCompressedFile())
+  if (isSequenceFile())
      useLibhdfsScan_ = TRUE;
   lobGlob_ = NULL;
   hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
@@ -569,6 +569,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 break;
              } 
              hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_,

+                            hdfsScanTdb().hdfsIoByteArraySize_, 
                             &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_,

                             hdfsStats_, hdfsScanRetCode);
              if (hdfsScanRetCode != HDFS_SCAN_OK) {
@@ -602,6 +603,11 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 break;
              } 
              hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
+             if (retArray_[BYTES_COMPLETED] == 0) {
+                ex_assert(headRoomCopied_ == 0, "Internal Error in HdfsScan");
+                step_ = TRAF_HDFS_READ;
+                break;  
+             }
              bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
              if (retArray_[RANGE_NO] != prevRangeNum_) {  
                 currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
@@ -624,13 +630,9 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); 
              else
                 extraBytesRead_ = 0;
+             ex_assert(extraBytesRead_ >= 0, "Negative number of extraBytesRead");
              // headRoom_ is the number of extra bytes to be read (rangeTailIOSize)
              // If the whole range fits in one buffer, it is needed to process rows till
EOF for the last range alone.
-/*
-             if (retArray_[IS_EOF] && (extraBytesRead_ < headRoom_)  
-                   && (retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1)))
-                 extraBytesRead_ = 0;
-*/
              if (numFiles_ <= 1) {
                 if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ &&
(retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1)))
                    extraBytesRead_ = 0;
@@ -2048,7 +2050,7 @@ void ExHdfsScanTcb::computeRangesAtRuntime()
         }
       else
         e->bytesToRead_ = (Int64) fileInfos[h].mSize;
-
+      e->compressionMethod_  = 0;
       hdfsFileInfoListAsArray_.insertAt(h, e);
     }
 }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/HdfsClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp
index 0622b50..5b8e850 100644
--- a/core/sql/executor/HdfsClient_JNI.cpp
+++ b/core/sql/executor/HdfsClient_JNI.cpp
@@ -26,6 +26,7 @@
 #include "Context.h"
 #include "jni.h"
 #include "HdfsClient_JNI.h"
+#include "org_trafodion_sql_HDFSClient.h"
 
 // ===========================================================================
 // ===== Class HdfsScan
@@ -83,7 +84,7 @@ HDFS_Scan_RetCode HdfsScan::init()
     JavaMethods_[JM_CTOR      ].jm_name      = "<init>";
     JavaMethods_[JM_CTOR      ].jm_signature = "()V";
     JavaMethods_[JM_SET_SCAN_RANGES].jm_name      = "setScanRanges";
-    JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/lang/String;[J[J[I)V";
+    JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;S[Ljava/lang/String;[J[J[I[S)V";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_name      = "trafHdfsRead";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I";
     JavaMethods_[JM_STOP].jm_name      = "stop";
@@ -106,7 +107,7 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum)
 }
 
 /////////////////////////////////////////////////////////////////////////////
-HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,  int
scanBufSize,
+HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,  int
scanBufSize, short hdfsIoByteArraySize,
       HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called.");
@@ -138,10 +139,12 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfsScan
       jenv_->PopLocalFrame(NULL);
       return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
    }
+   jshort j_hdfsIoByteArraySize = hdfsIoByteArraySize;
    jobjectArray j_filenames = NULL;
    jlongArray j_offsets = NULL;
    jlongArray j_lens = NULL;  
    jintArray j_rangenums = NULL;
+   jshortArray j_compress = NULL;
    HdfsFileInfo *hdfo;
    jstring j_obj;
 
@@ -184,7 +187,11 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfsScan
              return hdfsScanRetCode;
           }
        }
-       long len = hdfo->getBytesToRead()+rangeTailIOSize;
+       long len;
+       if (hdfo->getBytesToRead() > (LONG_MAX-rangeTailIOSize))
+           len  = LONG_MAX;
+       else
+           len  = hdfo->getBytesToRead()+rangeTailIOSize;
        jenv_->SetLongArrayRegion(j_lens, rangeCount, 1, &len);
 
        if (j_rangenums == NULL) {
@@ -196,12 +203,24 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfsScan
        }
        jint tdbRangeNum = i;
        jenv_->SetIntArrayRegion(j_rangenums, rangeCount, 1, &tdbRangeNum);
+
+       if (j_compress == NULL) {
+          j_compress = jenv_->NewShortArray(numRanges);
+          if (jenv_->ExceptionCheck()) {
+             jenv_->PopLocalFrame(NULL);
+             return hdfsScanRetCode;
+          }
+       }
+       short compressionMethod = (short)hdfo->getCompressionMethod();
+       //ex_assert(compressionMethod >= 0 && compressionMethod <= ComCompressionInfo::LZOP,
"Illegal CompressionMethod Value");
+       jenv_->SetShortArrayRegion(j_compress, rangeCount, 1, &compressionMethod);
    } 
 
    if (hdfsStats_ != NULL)
        hdfsStats_->getHdfsTimer().start();
    tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name;
-   jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1,
j_buf2, j_filenames, j_offsets, j_lens, j_rangenums);
+   jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1,
j_buf2, j_hdfsIoByteArraySize, 
+                      j_filenames, j_offsets, j_lens, j_rangenums, j_compress);
    if (hdfsStats_ != NULL) {
       hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
       hdfsStats_->incHdfsCalls();
@@ -216,7 +235,7 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfsScan
 }
 
 HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,
 int scanBufSize,
-      HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize,

+      short hdfsIoByteArraySize, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum,
Int32 numRanges, int rangeTailIOSize, 
       ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called.");
@@ -228,7 +247,7 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfs
    if (hdfsScan != NULL) {
        hdfsScanRetCode = hdfsScan->init();
        if (hdfsScanRetCode == HDFS_SCAN_OK) 
-          hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, 
+          hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, hdfsIoByteArraySize,
 
                     hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize); 
        if (hdfsScanRetCode == HDFS_SCAN_OK) 
           hdfsScan->setHdfsStats(hdfsStats);
@@ -359,7 +378,7 @@ void HdfsClient::deleteHdfsFileInfo()
    hdfsFileInfo_ = NULL;
 }
 
-HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode
&retCode)
+HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode
&retCode, short hdfsIoByteArraySize)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called.");
 
@@ -369,8 +388,10 @@ HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats,
HD
    HdfsClient *hdfsClient = new (heap) HdfsClient(heap);
    if (hdfsClient != NULL) {
        retCode = hdfsClient->init();
-       if (retCode == HDFS_CLIENT_OK) 
+       if (retCode == HDFS_CLIENT_OK) {
           hdfsClient->setHdfsStats(hdfsStats);
+          hdfsClient->setIoByteArraySize(hdfsIoByteArraySize);
+       }
        else {
           NADELETE(hdfsClient, HdfsClient, heap);
           hdfsClient = NULL;
@@ -574,41 +595,50 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode
&hd
      hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
      return 0;
   }
-
-  //Write the requisite bytes into the file
-  jbyteArray jbArray = jenv_->NewByteArray( len);
-  if (!jbArray) {
-    GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM));
-    jenv_->PopLocalFrame(NULL);
-    hdfsClientRetcode =  HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM;
-    return 0;
-  }
-  jenv_->SetByteArrayRegion(jbArray, 0, len, (const jbyte*)data);
-
-  if (hdfsStats_ != NULL)
-     hdfsStats_->getHdfsTimer().start();
-
-  tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name;
-  // Java method returns the cumulative bytes written
-  jint totalBytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID,
jbArray);
-
-  if (hdfsStats_ != NULL) {
-      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
-      hdfsStats_->incHdfsCalls();
-  }
-  if (jenv_->ExceptionCheck())
+  Int64 lenRemain = len;
+  Int64 writeLen;
+  Int64 chunkLen = (ioByteArraySize_ > 0 ? ioByteArraySize_ * 1024 : 0);
+  Int64 offset = 0;
+  do 
   {
-    getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()");
-    jenv_->PopLocalFrame(NULL);
-    hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
-    return 0;
-  }
-
+     if ((chunkLen > 0) && (lenRemain > chunkLen))
+        writeLen = chunkLen; 
+     else
+        writeLen = lenRemain;
+     //Write the requisite bytes into the file
+     jbyteArray jbArray = jenv_->NewByteArray(writeLen);
+     if (!jbArray) {
+        GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM));
+        jenv_->PopLocalFrame(NULL);
+        hdfsClientRetcode =  HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM;
+        return 0;
+     }
+     jenv_->SetByteArrayRegion(jbArray, 0, writeLen, (const jbyte*)(data+offset));
+
+     if (hdfsStats_ != NULL)
+         hdfsStats_->getHdfsTimer().start();
+
+     tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name;
+     // Java method returns the cumulative bytes written
+     jint totalBytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID,
jbArray);
+
+     if (hdfsStats_ != NULL) {
+         hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+         hdfsStats_->incHdfsCalls();
+     }
+     if (jenv_->ExceptionCheck())
+     {
+        getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()");
+        jenv_->PopLocalFrame(NULL);
+        hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
+        return 0;
+     }
+     lenRemain -= writeLen;
+     offset += writeLen;
+  } while (lenRemain > 0);
   jenv_->PopLocalFrame(NULL);
   hdfsClientRetcode = HDFS_CLIENT_OK;
-  Int32 bytesWritten = totalBytesWritten - totalBytesWritten_;
-  totalBytesWritten_ = totalBytesWritten;
-  return bytesWritten; 
+  return len; 
 }
 
 Int32 HdfsClient::hdfsRead(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode)
@@ -1018,6 +1048,22 @@ jint JNICALL Java_org_trafodion_sql_HDFSClient_sendFileStatus
    return (jint) retcode;
 }
 
+JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_copyToByteBuffer
+  (JNIEnv *jenv, jobject j_obj, jobject j_buf, jint offset, jbyteArray j_bufArray, jint copyLen)
+{
+   void *bufBacking;
+   
+   bufBacking =  jenv->GetDirectBufferAddress(j_buf);
+   if (bufBacking == NULL)
+      return -1;
+   jlong capacity = jenv->GetDirectBufferCapacity(j_buf);
+   jbyte *byteBufferAddr = (jbyte *)bufBacking + offset; 
+   if ((offset + copyLen) > capacity)
+      return -2; 
+   jenv->GetByteArrayRegion(j_bufArray, 0, copyLen, byteBufferAddr);
+   return 0;
+}
+
 #ifdef __cplusplus
 }
 #endif

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/HdfsClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h
index 6f68f4d..a85c590 100644
--- a/core/sql/executor/HdfsClient_JNI.h
+++ b/core/sql/executor/HdfsClient_JNI.h
@@ -66,11 +66,11 @@ public:
   // Get the error description.
   static char* getErrorText(HDFS_Scan_RetCode errEnum);
 
-  static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int
scanBufSize, 
+  static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int
scanBufSize, short hdfsIoByteArraySize, 
             HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int
rangeTailIOSize,
             ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode);
 
-  HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,

+  HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
short hdfsIoByteArraySize, 
             HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, 
             int rangeTailIOSize);
 
@@ -169,9 +169,11 @@ public:
   }
  
   ~HdfsClient();
-  static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode
&retCode);
+  static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode
&retCode, short hdfsIoByteArraySize = 0);
   static HdfsClient *getInstance();
   static void deleteInstance();
+  void setIoByteArraySize(short size)
+      { ioByteArraySize_ = size; }
 
   // Get the error description.
   static char* getErrorText(HDFS_Client_RetCode errEnum);
@@ -224,6 +226,7 @@ private:
   int numFiles_;
   char *path_;
   Int64 totalBytesWritten_;
+  short ioByteArraySize_;
   ExHdfsScanStats *hdfsStats_;
   static jclass javaClass_;
   static JavaMethodInit* JavaMethods_;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/org_trafodion_sql_HDFSClient.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/org_trafodion_sql_HDFSClient.h b/core/sql/executor/org_trafodion_sql_HDFSClient.h
new file mode 100644
index 0000000..6e3485e
--- /dev/null
+++ b/core/sql/executor/org_trafodion_sql_HDFSClient.h
@@ -0,0 +1,31 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_trafodion_sql_HDFSClient */
+
+#ifndef _Included_org_trafodion_sql_HDFSClient
+#define _Included_org_trafodion_sql_HDFSClient
+#ifdef __cplusplus
+extern "C" {
+#endif
+#undef org_trafodion_sql_HDFSClient_UNCOMPRESSED
+#define org_trafodion_sql_HDFSClient_UNCOMPRESSED 1L
+/*
+ * Class:     org_trafodion_sql_HDFSClient
+ * Method:    copyToByteBuffer
+ * Signature: (Ljava/nio/ByteBuffer;I[BI)I
+ */
+JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_copyToByteBuffer
+  (JNIEnv *, jobject, jobject, jint, jbyteArray, jint);
+
+/*
+ * Class:     org_trafodion_sql_HDFSClient
+ * Method:    sendFileStatus
+ * Signature: (JIIZLjava/lang/String;JJSJLjava/lang/String;Ljava/lang/String;SJ)I
+ */
+JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_sendFileStatus
+  (JNIEnv *, jobject, jlong, jint, jint, jboolean, jstring, jlong, jlong, jshort, jlong,
jstring, jstring, jshort, jlong);
+
+#ifdef __cplusplus
+}
+#endif
+#endif

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/exp/ExpLOBinterface.h
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBinterface.h b/core/sql/exp/ExpLOBinterface.h
index 8194859..54435a3 100644
--- a/core/sql/exp/ExpLOBinterface.h
+++ b/core/sql/exp/ExpLOBinterface.h
@@ -29,6 +29,13 @@
 class HdfsFileInfo
 {
  public:
+  HdfsFileInfo() {
+     entryNum_ = -1;
+     startOffset_ = -1;
+     bytesToRead_ = 0;
+     compressionMethod_ = 0;
+     flags_ = 0;
+  }
   char * fileName() { return fileName_; }
 
   // used for text/seq file access
@@ -39,6 +46,8 @@ class HdfsFileInfo
   Int64 getStartRow() { return startOffset_; }
   Int64 getNumRows() { return bytesToRead_; }
 
+  Int16 getCompressionMethod() const { return compressionMethod_; }
+
   Lng32 getFlags() { return flags_; }
 
   void setFileIsLocal(NABoolean v)
@@ -64,6 +73,7 @@ class HdfsFileInfo
   NABasicPtr  fileName_;
   Int64 startOffset_;
   Int64 bytesToRead_;
+  Int16 compressionMethod_;
 };
 
 typedef HdfsFileInfo* HdfsFileInfoPtr;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/generator/GenFastTransport.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenFastTransport.cpp b/core/sql/generator/GenFastTransport.cpp
index 75c1e0e..5019485 100644
--- a/core/sql/generator/GenFastTransport.cpp
+++ b/core/sql/generator/GenFastTransport.cpp
@@ -476,8 +476,12 @@ static short ft_codegen(Generator *generator,
     replication
     );
 
+  UInt16 hdfsIoByteArraySize = (UInt16)
+      CmpCommon::getDefaultNumeric(HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB);
+  tdb->setHdfsIoByteArraySize(hdfsIoByteArraySize);
   tdb->setSequenceFile(isSequenceFile);
   tdb->setHdfsCompressed(CmpCommon::getDefaultNumeric(TRAF_UNLOAD_HDFS_COMPRESS)!=0);
+  
 
   if ((hiveNAColArray) &&
       (hiveInsertErrMode == 2))

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/generator/GenRelScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenRelScan.cpp b/core/sql/generator/GenRelScan.cpp
index 827b94c..0a133b2 100644
--- a/core/sql/generator/GenRelScan.cpp
+++ b/core/sql/generator/GenRelScan.cpp
@@ -1234,7 +1234,8 @@ if (hTabStats->isOrcFile())
      if (hdfsBufSizeTesting)
        hdfsBufSize = hdfsBufSizeTesting;
    }
-
+  UInt16 hdfsIoByteArraySize = (UInt16)
+      CmpCommon::getDefaultNumeric(HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB);
   UInt32 rangeTailIOSize = (UInt32)
       CmpCommon::getDefaultNumeric(HDFS_IO_RANGE_TAIL);
   if (rangeTailIOSize == 0) 
@@ -1362,7 +1363,7 @@ if (hTabStats->isOrcFile())
 
                    hdfsRootDir, modTS, numOfPartLevels, hdfsDirsToCheck
 		   );
-
+  hdfsscan_tdb->setHdfsIoByteArraySize(hdfsIoByteArraySize);
   generator->initTdbFields(hdfsscan_tdb);
 
   hdfsscan_tdb->setUseCursorMulti(useCursorMulti);

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/regress/hive/DIFF002.KNOWN
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/DIFF002.KNOWN b/core/sql/regress/hive/DIFF002.KNOWN
new file mode 100644
index 0000000..24c6ed7
--- /dev/null
+++ b/core/sql/regress/hive/DIFF002.KNOWN
@@ -0,0 +1,14 @@
+359,360c359,362
+< (EXPR)
+< ----------
+---
+> *** ERROR[8447] An error occurred during hdfs access. Error Detail: SETUP_HDFS_SCAN
java.io.IOException: LZOP compression codec is not configured in Hadoop
+> stackTraceRemoved
+> stackTraceRemoved
+> stackTraceRemoved
+362,364c364
+<  73049
+< 
+< --- 1 row(s) selected.
+---
+> --- 0 row(s) selected.

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/regress/hive/FILTER002
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/FILTER002 b/core/sql/regress/hive/FILTER002
new file mode 100755
index 0000000..83f8fbf
--- /dev/null
+++ b/core/sql/regress/hive/FILTER002
@@ -0,0 +1,33 @@
+#! /bin/sh
+# @@@ START COPYRIGHT @@@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# @@@ END COPYRIGHT @@@
+
+
+fil=$1
+if [ "$fil" = "" ]; then
+  echo "Usage: $0 filename"
+  exit 1
+fi
+
+fil=$1
+sed "
+s/org.trafodion.sql.*/stackTraceRemoved/g
+" $fil

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/sqlcomp/DefaultConstants.h
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h
index 0096424..b2b2bb9 100644
--- a/core/sql/sqlcomp/DefaultConstants.h
+++ b/core/sql/sqlcomp/DefaultConstants.h
@@ -3319,6 +3319,11 @@ enum DefaultConstants
   
   // This enum constant must be the LAST one in the list; it's a count,
   // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)!
+  // Size of byte[] in java when direct byteBuffer can't be used
+  // Used to read compressed hdfs text files and to write
+  // both compressed and uncompressed hdfs files
+  HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB,
+
   __NUM_DEFAULT_ATTRIBUTES
 };
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index 1c47c54..7e5cfa1 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -1496,6 +1496,7 @@ SDDkwd__(EXE_DIAGNOSTIC_EVENTS,		"OFF"),
 
   DDui1__(HDFS_IO_BUFFERSIZE,                            "65536"),
   DDui___(HDFS_IO_BUFFERSIZE_BYTES,               "0"),
+  DDui___(HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB,    "1024"),
   // The value 0 denotes RangeTail = max record length of table.
   DDui___(HDFS_IO_RANGE_TAIL,                     "0"),
   DDkwd__(HDFS_PREFETCH,                           "ON"),

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 0346bef..5ffcd03 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.io.FileNotFoundException;
 import java.io.EOFException;
 import java.io.OutputStream;
+import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.compress.CompressionInputStream;
 import java.io.EOFException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
@@ -53,31 +55,47 @@ import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 //
 //  To read a range in a Hdfs file, use the constructor
-//   public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position,
int length) throws IOException
+//   public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position,
int length, CompressionInputStream inStream) 
 // 
 //  For instance methods like hdfsListDirectory use the constructor
 //     public HDFSClient()
 //
 //  For all static methods use
-//     HdfsClient::<static_method_name>
+//     HDFSClient::<static_method_name>
 //
 
 public class HDFSClient 
 {
+   // Keep the constants and string array below in sync with 
+   // enum CompressionMethod at sql/comexe/ComCompressionInfo.h
+   static final short UNKNOWN_COMPRESSION = 0;
+   static final short UNCOMPRESSED = 1;
+   static final short LZOP = 5;
+   static final String COMPRESSION_TYPE[] = {
+      "UNKNOWN_COMPRESSION", // unable to determine compression method
+      "UNCOMPRESSED",            // file is not compressed
+      "LZO_DEFLATE",             // using LZO deflate compression
+      "DEFLATE",                 // using DEFLATE compression
+      "GZIP",                    // using GZIP compression
+      "LZOP"};                   // using LZOP compression
    static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
    private static Configuration config_ = null;
    private static ExecutorService executorService_ = null;
    private static FileSystem defaultFs_ = null;
+   private static CompressionCodecFactory codecFactory_ = null;
    private FileSystem fs_ = null;
    private int bufNo_;
    private int rangeNo_;
-   private FSDataInputStream fsdis_; 
+   private FSDataInputStream fsdis_;
+           CompressionInputStream inStream_; 
    private OutputStream outStream_;
    private String filename_;
    private ByteBuffer buf_;
+   private byte[] bufArray_;
    private int bufLen_;
    private int bufOffset_ = 0;
    private long pos_ = 0;
@@ -89,7 +107,10 @@ public class HDFSClient
    private int isEOF_ = 0; 
    private int totalBytesWritten_ = 0;
    private Path filepath_ = null;
-   private boolean compression_;
+   boolean compressed_ = false;
+   private CompressionCodec codec_ = null;
+   private short compressionType_;
+   private short ioByteArraySize_;
    static {
       String confFile = System.getProperty("trafodion.log4j.configFile");
       System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
@@ -105,6 +126,7 @@ public class HDFSClient
       catch (IOException ioe) {
          throw new RuntimeException("Exception in HDFSClient static block", ioe);
       }
+      codecFactory_ = new CompressionCodecFactory(config_); 
       System.loadLibrary("executor");
    }
 
@@ -125,6 +147,9 @@ public class HDFSClient
       {
          int bytesRead;
          int totalBytesRead = 0;
+         if (compressed_) {
+            bufArray_ = new byte[ioByteArraySize_ * 1024];
+         } else 
          if (! buf_.hasArray()) {
             try {
               fsdis_.seek(pos_);
@@ -135,10 +160,14 @@ public class HDFSClient
          }
          do
          {
-            if (buf_.hasArray())
-               bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_);
-            else 
-               bytesRead = fsdis_.read(buf_);
+            if (compressed_) {
+               bytesRead = compressedFileRead(lenRemain_);
+            } else {
+               if (buf_.hasArray())
+                  bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_);
+               else 
+                  bytesRead = fsdis_.read(buf_);
+            }
             if (bytesRead == -1) {
                isEOF_ = 1;
                break;
@@ -151,10 +180,38 @@ public class HDFSClient
             bufOffset_ += bytesRead;
             pos_ += bytesRead;
             lenRemain_ -= bytesRead;
-         } while (lenRemain_ > 0);
+         } while (lenRemain_ > 0); 
          return new Integer(totalBytesRead);
       }
-   }
+    } 
+
+    int compressedFileRead(int readLenRemain) throws IOException 
+    {
+       int totalReadLen = 0;
+       int readLen;
+       int offset = 0;
+       int retcode;
+
+         int lenRemain = ((readLenRemain > bufArray_.length) ? bufArray_.length : readLenRemain);
+         do 
+         {
+            readLen = inStream_.read(bufArray_, offset, lenRemain);
+            if (readLen == -1 || readLen == 0)
+               break;
+            totalReadLen += readLen;
+            offset  += readLen;
+            lenRemain -= readLen;
+         } while (lenRemain > 0);
+         if (totalReadLen > 0) {
+            if ((retcode = copyToByteBuffer(buf_, bufOffset_, bufArray_, totalReadLen)) !=
0)
+               throw new IOException("Failure to copy to the DirectByteBuffer in the native
layer with error code " + retcode);
+         }
+         else
+            totalReadLen = -1;
+         return totalReadLen; 
+    } 
+
+    native int copyToByteBuffer(ByteBuffer buf, int bufOffset, byte[] bufArray, int copyLen);
        
    public HDFSClient() 
    {
@@ -166,14 +223,30 @@ public class HDFSClient
    // The passed in length can never be more than the size of the buffer
    // If the range has a length more than the buffer length, the range is chunked
    // in HdfsScan
-   public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position,
int length) throws IOException
+   public HDFSClient(int bufNo, short ioByteArraySize, int rangeNo, String filename, ByteBuffer
buffer, long position, 
+                int length, short compressionType, CompressionInputStream inStream) throws
IOException
    {
       bufNo_ = bufNo; 
       rangeNo_ = rangeNo;
       filename_ = filename;
+      ioByteArraySize_ = ioByteArraySize;
       filepath_ = new Path(filename_);
       fs_ = FileSystem.get(filepath_.toUri(),config_);
-      fsdis_ = fs_.open(filepath_);
+      compressionType_ = compressionType;
+      inStream_ = inStream;
+      codec_ = codecFactory_.getCodec(filepath_);
+      if (codec_ != null) {
+        compressed_ = true;
+        if (inStream_ == null)
+           inStream_ = codec_.createInputStream(fs_.open(filepath_));
+      }
+      else {
+        if ((compressionType_ != UNCOMPRESSED) && (compressionType_ != UNKNOWN_COMPRESSION))
+           throw new IOException(COMPRESSION_TYPE[compressionType_] + " compression codec
is not configured in Hadoop");
+        if (filename_.endsWith(".lzo"))
+           throw new IOException(COMPRESSION_TYPE[LZOP] + " compression codec is not configured
in Hadoop");
+        fsdis_ = fs_.open(filepath_);
+      }
       blockSize_ = (int)fs_.getDefaultBlockSize(filepath_);
       buf_  = buffer;
       bufOffset_ = 0;
@@ -201,7 +274,8 @@ public class HDFSClient
       int bytesRead;
       retObject = (Integer)future_.get();
       bytesRead = retObject.intValue();
-      fsdis_.close();
+      if (! compressed_)
+         fsdis_.close();
       fsdis_ = null;
       return bytesRead;
    }  
@@ -226,7 +300,7 @@ public class HDFSClient
         filepath_ = new Path(fname + ".gz");
         
       fs_ = FileSystem.get(filepath_.toUri(),config_);
-      compression_ = compress;
+      compressed_ = compress;
       fsdis_ = null;      
       FSDataOutputStream fsOut;
       if (overwrite)
@@ -237,7 +311,7 @@ public class HDFSClient
       else
          fsOut = fs_.create(filepath_);
 
-      if (compression_) {
+      if (compressed_) {
           GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class,
config_);
           Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
           outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
@@ -256,7 +330,7 @@ public class HDFSClient
       else
         filepath_ = new Path(fname + ".gz");
       fs_ = FileSystem.get(filepath_.toUri(),config_);
-      compression_ = compress;  
+      compressed_ = compress;  
       outStream_ = null;
       fsdis_ = null;      
       return true;
@@ -274,7 +348,7 @@ public class HDFSClient
          else
             fsOut = fs_.create(filepath_);
       
-         if (compression_) {
+         if (compressed_) {
             GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class,
config_);
             Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
             outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
@@ -285,21 +359,23 @@ public class HDFSClient
             logger_.debug("HDFSClient.hdfsWrite() - output stream created" );
       }
       outStream_.write(buff);
-      if (outStream_ instanceof FSDataOutputStream)
-         totalBytesWritten_ = ((FSDataOutputStream)outStream_).size();
-      else
-         totalBytesWritten_ += buff.length; 
       if (logger_.isDebugEnabled()) 
-         logger_.debug("HDFSClient.hdfsWrite() - bytes written " + totalBytesWritten_ );
-      return totalBytesWritten_;
+         logger_.debug("HDFSClient.hdfsWrite() - bytes written " + buff.length);
+      return buff.length;
     }
 
     int hdfsRead(ByteBuffer buffer) throws IOException
     {
       if (logger_.isDebugEnabled()) 
          logger_.debug("HDFSClient.hdfsRead() - started" );
-      if (fsdis_ == null) {
-         fsdis_ = fs_.open(filepath_);
+      if (fsdis_ == null && inStream_ == null ) {
+         codec_ = codecFactory_.getCodec(filepath_);
+         if (codec_ != null) {
+            compressed_ = true;
+            inStream_ = codec_.createInputStream(fs_.open(filepath_));
+         }
+         else
+            fsdis_ = fs_.open(filepath_);
          pos_ = 0;
       }
       int lenRemain;   
@@ -307,6 +383,8 @@ public class HDFSClient
       int totalBytesRead = 0;
       int bufLen;
       int bufOffset = 0;
+      if (compressed_ && bufArray_ != null) 
+         bufArray_ = new byte[ioByteArraySize_ * 1024];
       if (buffer.hasArray())
          bufLen = buffer.array().length;
       else
@@ -314,10 +392,14 @@ public class HDFSClient
       lenRemain = bufLen;
       do
       {
-         if (buffer.hasArray())
-            bytesRead = fsdis_.read(pos_, buffer.array(), bufOffset, lenRemain);
-         else
-            bytesRead = fsdis_.read(buffer);    
+         if (compressed_) {
+            bytesRead = compressedFileRead(lenRemain);
+         } else {
+           if (buffer.hasArray()) 
+              bytesRead = fsdis_.read(pos_, buffer.array(), bufOffset, lenRemain);
+           else
+              bytesRead = fsdis_.read(buffer);    
+         }
          if (bytesRead == -1 || bytesRead == 0)
             break;    
          totalBytesRead += bytesRead;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index 99f021d..b438fb2 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.compress.CompressionInputStream;
 import java.net.URI;
 
 // This class implements an efficient mechanism to read hdfs files
@@ -72,6 +73,9 @@ public class HdfsScan
    private long currRangeLenRemain_;
    private int lastBufCompleted_ = -1;
    private boolean scanCompleted_;
+   private CompressionInputStream currInStream_;
+   private short ioByteArraySize_;
+ 
  
    // Structure to hold the Scan ranges for this HdfsScan instance
    //
@@ -82,13 +86,15 @@ public class HdfsScan
       long pos_;
       long len_;
       int tdbRangeNum_;
+      short compressionType_;
       
-      HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
+      HdfsScanRange(String filename, long pos, long len, int tdbRangeNum, short compressionType)
       {
          filename_ = filename;
          pos_ = pos;
          len_ = len;
          tdbRangeNum_ = tdbRangeNum;
+         compressionType_ = compressionType;
       }
    }
    
@@ -103,7 +109,8 @@ public class HdfsScan
    {
    }
 
-   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[],
long len[], int rangeNum[]) throws IOException
+   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, short ioByteArraySize, String
filename[], long pos[], 
+            long len[], int rangeNum[], short compressionType[]) throws IOException
    {
       // Two buffers to hold the data read
       buf_ = new ByteBuffer[2];
@@ -111,7 +118,7 @@ public class HdfsScan
 
       buf_[0] = buf1;
       buf_[1] = buf2;
-
+      ioByteArraySize_ = ioByteArraySize;
       for (int i = 0; i < 2 ; i++) {
           if (buf_[i].hasArray())
              bufLen_[i] = buf_[i].array().length;
@@ -121,12 +128,13 @@ public class HdfsScan
       hdfsClient_ = new HDFSClient[2];
       hdfsScanRanges_ = new HdfsScanRange[filename.length]; 
       for (int i = 0; i < filename.length; i++) {
-         hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i]);
+         hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i],
compressionType[i]);
       }
       if (hdfsScanRanges_.length > 0) {
          currRange_ = 0;
          currRangePos_ = hdfsScanRanges_[currRange_].pos_;
          currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; 
+         currInStream_ = null;
          scheduleHdfsScanRange(0, 0);
       }
       scanCompleted_ = false;
@@ -146,6 +154,9 @@ public class HdfsScan
             currRange_++;
             currRangePos_ = hdfsScanRanges_[currRange_].pos_;
             currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_; 
+            if (currInStream_ != null)
+               currInStream_.close();
+            currInStream_ = null;
          }
       } 
       if (currRangeLenRemain_ > bufLen_[bufNo])
@@ -155,7 +166,11 @@ public class HdfsScan
       if (! scanCompleted_) {
          if (logger_.isDebugEnabled())
             logger_.debug(" CurrentRange " + hdfsScanRanges_[currRange_].tdbRangeNum_ + "
LenRemain " + currRangeLenRemain_ + " BufNo " + bufNo); 
-         hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].tdbRangeNum_,
hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currRangePos_, readLength);
+         hdfsClient_[bufNo] = new HDFSClient(bufNo, ioByteArraySize_, hdfsScanRanges_[currRange_].tdbRangeNum_,

+			hdfsScanRanges_[currRange_].filename_, 
+                        buf_[bufNo], currRangePos_, readLength, 
+                        hdfsScanRanges_[currRange_].compressionType_, currInStream_);
+                        
       }
    } 
   
@@ -188,6 +203,7 @@ public class HdfsScan
             bufNo = 0;
             rangeNo = hdfsClient_[0].getRangeNo();
             isEOF = hdfsClient_[0].isEOF();
+            currInStream_ = hdfsClient_[0].inStream_;
             break;
          case 0:
             // Wait for the read to complete in buffer 1
@@ -195,6 +211,7 @@ public class HdfsScan
             bufNo = 1;
             rangeNo = hdfsClient_[1].getRangeNo();
             isEOF = hdfsClient_[1].isEOF();
+            currInStream_ = hdfsClient_[1].inStream_;
             break;
          default:
             bufNo = -1;
@@ -218,6 +235,9 @@ public class HdfsScan
             currRangePos_ = hdfsScanRanges_[currRange_].pos_;
             currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_;
             bytesRead = 0;
+            if (currInStream_ != null)
+               currInStream_.close();
+            currInStream_ = null;
          }
       }
       switch (lastBufCompleted_)
@@ -278,6 +298,7 @@ public class HdfsScan
       long pos[] = new long[file_status.length * split];
       long len[] = new long[file_status.length * split];
       int range[] = new int[file_status.length * split];
+      short compress[] = new short[file_status.length * split];
       for (int i = 0 ; i < file_status.length * split; i++) {
          Path filePath = file_status[i].getPath();
          long fileLen = file_status[i].getLen(); 
@@ -293,13 +314,14 @@ public class HdfsScan
             range[i] = i;
             if (j == (split-1))
                len[i] = fileLen - (splitLen *(j));
+            compress[i] = 1; // Uncompressed
             System.out.println ("Range " + i + " Pos " + pos[i] + " Length " + len[i]); 
             i++;
          }
       }
       long time1 = System.currentTimeMillis();
       HdfsScan hdfsScan = new HdfsScan();
-      hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len, range);
+      hdfsScan.setScanRanges(buf1, buf2, (short)512, fileName, pos, len, range, compress);
       int[] retArray;
       int bytesCompleted;
       ByteBuffer buf;


Mime
View raw message