trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/4] trafodion git commit: [TRAFODION-2982] JNI HDFS interface should support varied sized large buffers for read/write
Date Thu, 10 May 2018 21:08:22 GMT
[TRAFODION-2982] JNI HDFS interface should support varied sized large buffers for read/write

Changed the interim byte array size to be of int type to accoumdate
at least 64MB as per review comments.


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/d9ee71e1
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/d9ee71e1
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/d9ee71e1

Branch: refs/heads/master
Commit: d9ee71e10bf9054be28ca000aa64553abc00584f
Parents: 96cab4d
Author: selvaganesang <selva.govindarajan@esgyn.com>
Authored: Wed May 9 23:35:46 2018 +0000
Committer: selvaganesang <selva.govindarajan@esgyn.com>
Committed: Wed May 9 23:35:46 2018 +0000

----------------------------------------------------------------------
 core/sql/comexe/ComCompressionInfo.cpp          | 84 ++++++++++++++++++
 core/sql/comexe/ComCompressionInfo.h            | 91 ++++++++++++++++++++
 core/sql/comexe/ComTdbFastTransport.cpp         |  2 +-
 core/sql/comexe/ComTdbFastTransport.h           |  8 +-
 core/sql/comexe/ComTdbHdfsScan.cpp              |  2 +-
 core/sql/comexe/ComTdbHdfsScan.h                |  9 +-
 core/sql/executor/ExHdfsScan.cpp                |  2 +-
 core/sql/executor/HdfsClient_JNI.cpp            | 21 ++---
 core/sql/executor/HdfsClient_JNI.h              | 12 +--
 core/sql/nskgmake/comexe/Makefile               |  3 +-
 .../main/java/org/trafodion/sql/HDFSClient.java | 10 +--
 .../main/java/org/trafodion/sql/HdfsScan.java   |  9 +-
 12 files changed, 214 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComCompressionInfo.cpp
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComCompressionInfo.cpp b/core/sql/comexe/ComCompressionInfo.cpp
new file mode 100644
index 0000000..75ae6ce
--- /dev/null
+++ b/core/sql/comexe/ComCompressionInfo.cpp
@@ -0,0 +1,84 @@
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+
+#include "ComCompressionInfo.h"
+
+ComCompressionInfo::~ComCompressionInfo()
+{}
+
+void ComCompressionInfo::setCompressionMethod(const char *fileName)
+{
+  compressionMethod_ = getCompressionMethodFromFileName(fileName);
+}
+
+ComCompressionInfo::CompressionMethod ComCompressionInfo::getCompressionMethodFromFileName(
+     const char *f)
+{
+  const char * ret = strcasestr(f, ".lzo_deflate");
+  if (ret)
+    return LZO_DEFLATE;
+  ret = strcasestr(f, ".lzo");
+  if (ret)
+    return LZOP;
+  ret = strcasestr(f, ".deflate");
+  if (ret)
+    return DEFLATE;
+  ret = strcasestr(f, ".gz");
+  if (ret)
+    return GZIP;
+
+  return UNCOMPRESSED;
+}
+
+// virtual methods overridden from NAVersionedObject base class
+
+char *ComCompressionInfo::findVTblPtr(short classID)
+{
+  char *vtblPtr;
+  GetVTblPtr(vtblPtr, ComCompressionInfo);
+
+  return vtblPtr;
+}
+
+unsigned char ComCompressionInfo::getClassVersionID()
+{
+  return 1;
+}
+
+void ComCompressionInfo::populateImageVersionIDArray()
+{
+  setImageVersionID(0,getClassVersionID());
+}
+
+short ComCompressionInfo::getClassSize()
+{
+  return (short) sizeof(ComCompressionInfo);
+}
+
+Long ComCompressionInfo::pack(void * space)
+{
+  return NAVersionedObject::pack(space);
+}
+
+Lng32 ComCompressionInfo::unpack(void * base, void * reallocator)
+{
+  return NAVersionedObject::unpack(base, reallocator);
+}

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComCompressionInfo.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComCompressionInfo.h b/core/sql/comexe/ComCompressionInfo.h
new file mode 100644
index 0000000..3927bad
--- /dev/null
+++ b/core/sql/comexe/ComCompressionInfo.h
@@ -0,0 +1,91 @@
+/**********************************************************************
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+/* -*-C++-*-
+****************************************************************************
+*
+* File:         ComCompressionInfo.h
+* Description:  Description of the compression method used, for how
+*               this is used for Hive tables, but it could be
+*               expanded to other objects.
+* Created:      4/20/16
+* Language:     C++
+*
+****************************************************************************
+*/
+
+#ifndef COM_COMPRESSION_INFO_H
+#define COM_COMPRESSION_INFO_H
+
+#include "NAVersionedObject.h"
+
+class ComCompressionInfo : public NAVersionedObject
+{
+public:
+  // Update the COMPRESSION_TYPE[] at org/trafodion/sql/HDFSClient.java when new enum is
added
+  enum CompressionMethod
+    { UNKNOWN_COMPRESSION = 0, // unable to determine compression method
+      UNCOMPRESSED,            // file is not compressed
+      LZO_DEFLATE,             // using LZO deflate compression
+      DEFLATE,                 // using DEFLATE compression
+      GZIP,                    // using GZIP compression
+      LZOP,                   // using LZOP compression
+      SUPPORTED_COMPRESSIONS }; // Add any compression type above this line 
+
+  ComCompressionInfo(CompressionMethod cm = UNKNOWN_COMPRESSION) :
+       NAVersionedObject(-1),
+       compressionMethod_(cm)
+  {}
+
+  virtual ~ComCompressionInfo();
+
+  bool operator==(const ComCompressionInfo &o) const
+                         { return compressionMethod_ == o.compressionMethod_; }
+
+  CompressionMethod getCompressionMethod() const { return compressionMethod_; }
+  void setCompressionMethod(const char *fileName);
+
+  NABoolean isCompressed() const
+                       { return (compressionMethod_ != UNCOMPRESSED &&
+                                 compressionMethod_ != UNKNOWN_COMPRESSION ); }
+
+  NABoolean splitsAllowed() const                   { return !isCompressed(); }
+
+  // try to determine the compression method just from a file name
+  static CompressionMethod getCompressionMethodFromFileName(const char *f);
+
+  // ---------------------------------------------------------------------
+  // Redefine virtual functions required for Versioning.
+  //----------------------------------------------------------------------
+  virtual char *findVTblPtr(short classID);
+  virtual unsigned char getClassVersionID();
+  virtual void populateImageVersionIDArray();
+  virtual short getClassSize();
+  virtual Long pack(void * space);
+  virtual Lng32 unpack(void * base, void * reallocator);
+
+private:
+
+  CompressionMethod compressionMethod_;
+
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComTdbFastTransport.cpp
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbFastTransport.cpp b/core/sql/comexe/ComTdbFastTransport.cpp
index 90f635f..0b9cf9e 100644
--- a/core/sql/comexe/ComTdbFastTransport.cpp
+++ b/core/sql/comexe/ComTdbFastTransport.cpp
@@ -99,7 +99,7 @@ ComTdbFastExtract::ComTdbFastExtract(
   hdfsReplication_(replication),
   ioTimeout_(ioTimeout),
   childDataRowLen_(childDataRowLen),
-  hdfsIoByteArraySize_(0),
+  hdfsIoByteArraySizeInKB_(0),
   modTSforDir_(-1)
 {
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComTdbFastTransport.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbFastTransport.h b/core/sql/comexe/ComTdbFastTransport.h
index 0666953..2fb89cc 100644
--- a/core/sql/comexe/ComTdbFastTransport.h
+++ b/core/sql/comexe/ComTdbFastTransport.h
@@ -370,9 +370,9 @@ public:
   Int64 getModTSforDir() const { return modTSforDir_; }
 
   void setHdfsIoByteArraySize(int size)             
-    { hdfsIoByteArraySize_ = size; }                          
-  UInt16 getHdfsIoByteArraySize()                             
-    { return hdfsIoByteArraySize_; }
+    { hdfsIoByteArraySizeInKB_ = size; }                          
+  Lng32 getHdfsIoByteArraySize()                             
+    { return hdfsIoByteArraySizeInKB_; }
 protected:
   NABasicPtr   targetName_;                                  // 00 - 07
   NABasicPtr   delimiter_;                                   // 08 - 15
@@ -399,7 +399,7 @@ protected:
   UInt16       filler_;                                      // 130 - 131
   UInt32       childDataRowLen_;                             // 132 - 135
   Int64        modTSforDir_;                                 // 136 - 143
-  UInt16       hdfsIoByteArraySize_;                         // 144 - 147 
+  Lng32        hdfsIoByteArraySizeInKB_;                     // 144 - 147 
 
   // Make sure class size is a multiple of 8
   char fillerComTdbFastTransport_[4];                        // 148 - 151

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComTdbHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHdfsScan.cpp b/core/sql/comexe/ComTdbHdfsScan.cpp
index f5e2907..fb58b54 100755
--- a/core/sql/comexe/ComTdbHdfsScan.cpp
+++ b/core/sql/comexe/ComTdbHdfsScan.cpp
@@ -121,7 +121,7 @@ ComTdbHdfsScan::ComTdbHdfsScan(
   hdfsRootDir_(hdfsRootDir),
   modTSforDir_(modTSforDir),
   numOfPartCols_(numOfPartCols),
-  hdfsIoByteArraySize_(0),
+  hdfsIoByteArraySizeInKB_(0),
   hdfsDirsToCheck_(hdfsDirsToCheck)
 {};
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/comexe/ComTdbHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h
index f9a0afd..80a0280 100755
--- a/core/sql/comexe/ComTdbHdfsScan.h
+++ b/core/sql/comexe/ComTdbHdfsScan.h
@@ -136,14 +136,13 @@ class ComTdbHdfsScan : public ComTdb
   UInt16 origTuppIndex_;                                      // 188 - 189
   char fillersComTdbHdfsScan1_[2];                            // 190 - 191
   NABasicPtr nullFormat_;                                     // 192 - 199
-  UInt16 hdfsIoByteArraySize_;                                // 198 - 199
 
   // next 4 params are used to check if data under hdfsFileDir
   // was modified after query was compiled.
   NABasicPtr hdfsRootDir_;                                     // 200 - 207
   Int64  modTSforDir_;                                         // 208 - 215
   Lng32  numOfPartCols_;                                       // 216 - 219
-  char fillersComTdbHdfsScan2_[4];                             // 220 - 223
+  Lng32  hdfsIoByteArraySizeInKB_;                             // 220 - 223
   QueuePtr hdfsDirsToCheck_;                                   // 224 - 231
     
 public:
@@ -364,9 +363,9 @@ public:
  
   char *hdfsRootDir() { return hdfsRootDir_; }
   void setHdfsIoByteArraySize(int size)
-    { hdfsIoByteArraySize_ = size; }
-  UInt16 getHdfsIoByteArraySize() 
-    { return hdfsIoByteArraySize_; }
+    { hdfsIoByteArraySizeInKB_ = size; }
+  int getHdfsIoByteArraySize() 
+    { return hdfsIoByteArraySizeInKB_; }
 };
 
 inline ComTdb * ComTdbHdfsScan::getChildTdb()

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index e5d73dc..9c68aa7 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -569,7 +569,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 break;
              } 
              hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_,

-                            hdfsScanTdb().hdfsIoByteArraySize_, 
+                            hdfsScanTdb().hdfsIoByteArraySizeInKB_, 
                             &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_,

                             hdfsStats_, hdfsScanRetCode);
              if (hdfsScanRetCode != HDFS_SCAN_OK) {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/executor/HdfsClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp
index 5b8e850..72157bf 100644
--- a/core/sql/executor/HdfsClient_JNI.cpp
+++ b/core/sql/executor/HdfsClient_JNI.cpp
@@ -27,6 +27,7 @@
 #include "jni.h"
 #include "HdfsClient_JNI.h"
 #include "org_trafodion_sql_HDFSClient.h"
+#include "ComCompressionInfo.h"
 
 // ===========================================================================
 // ===== Class HdfsScan
@@ -84,7 +85,7 @@ HDFS_Scan_RetCode HdfsScan::init()
     JavaMethods_[JM_CTOR      ].jm_name      = "<init>";
     JavaMethods_[JM_CTOR      ].jm_signature = "()V";
     JavaMethods_[JM_SET_SCAN_RANGES].jm_name      = "setScanRanges";
-    JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;S[Ljava/lang/String;[J[J[I[S)V";
+    JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;I[Ljava/lang/String;[J[J[I[S)V";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_name      = "trafHdfsRead";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I";
     JavaMethods_[JM_STOP].jm_name      = "stop";
@@ -107,7 +108,7 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum)
 }
 
 /////////////////////////////////////////////////////////////////////////////
-HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,  int
scanBufSize, short hdfsIoByteArraySize,
+HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,  int
scanBufSize, int hdfsIoByteArraySizeInKB,
       HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called.");
@@ -139,7 +140,7 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfsScan
       jenv_->PopLocalFrame(NULL);
       return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
    }
-   jshort j_hdfsIoByteArraySize = hdfsIoByteArraySize;
+   jint j_hdfsIoByteArraySizeInKB = hdfsIoByteArraySizeInKB;
    jobjectArray j_filenames = NULL;
    jlongArray j_offsets = NULL;
    jlongArray j_lens = NULL;  
@@ -212,14 +213,14 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfsScan
           }
        }
        short compressionMethod = (short)hdfo->getCompressionMethod();
-       //ex_assert(compressionMethod >= 0 && compressionMethod <= ComCompressionInfo::LZOP,
"Illegal CompressionMethod Value");
+       ex_assert(compressionMethod >= 0 && compressionMethod < ComCompressionInfo::SUPPORTED_COMPRESSIONS,
"Illegal CompressionMethod Value");
        jenv_->SetShortArrayRegion(j_compress, rangeCount, 1, &compressionMethod);
    } 
 
    if (hdfsStats_ != NULL)
        hdfsStats_->getHdfsTimer().start();
    tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name;
-   jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1,
j_buf2, j_hdfsIoByteArraySize, 
+   jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1,
j_buf2, j_hdfsIoByteArraySizeInKB, 
                       j_filenames, j_offsets, j_lens, j_rangenums, j_compress);
    if (hdfsStats_ != NULL) {
       hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
@@ -235,7 +236,7 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfsScan
 }
 
 HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,
 int scanBufSize,
-      short hdfsIoByteArraySize, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum,
Int32 numRanges, int rangeTailIOSize, 
+      int hdfsIoByteArraySizeInKB, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum,
Int32 numRanges, int rangeTailIOSize, 
       ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called.");
@@ -247,7 +248,7 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF
*hdfs
    if (hdfsScan != NULL) {
        hdfsScanRetCode = hdfsScan->init();
        if (hdfsScanRetCode == HDFS_SCAN_OK) 
-          hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, hdfsIoByteArraySize,
 
+          hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, hdfsIoByteArraySizeInKB,
 
                     hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize); 
        if (hdfsScanRetCode == HDFS_SCAN_OK) 
           hdfsScan->setHdfsStats(hdfsStats);
@@ -378,7 +379,7 @@ void HdfsClient::deleteHdfsFileInfo()
    hdfsFileInfo_ = NULL;
 }
 
-HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode
&retCode, short hdfsIoByteArraySize)
+HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode
&retCode, int hdfsIoByteArraySizeInKB)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called.");
 
@@ -390,7 +391,7 @@ HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats,
HD
        retCode = hdfsClient->init();
        if (retCode == HDFS_CLIENT_OK) {
           hdfsClient->setHdfsStats(hdfsStats);
-          hdfsClient->setIoByteArraySize(hdfsIoByteArraySize);
+          hdfsClient->setIoByteArraySize(hdfsIoByteArraySizeInKB);
        }
        else {
           NADELETE(hdfsClient, HdfsClient, heap);
@@ -597,7 +598,7 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode
&hd
   }
   Int64 lenRemain = len;
   Int64 writeLen;
-  Int64 chunkLen = (ioByteArraySize_ > 0 ? ioByteArraySize_ * 1024 : 0);
+  Int64 chunkLen = (ioByteArraySizeInKB_ > 0 ? ioByteArraySizeInKB_ * 1024 : 0);
   Int64 offset = 0;
   do 
   {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/executor/HdfsClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h
index a85c590..888451c 100644
--- a/core/sql/executor/HdfsClient_JNI.h
+++ b/core/sql/executor/HdfsClient_JNI.h
@@ -66,11 +66,11 @@ public:
   // Get the error description.
   static char* getErrorText(HDFS_Scan_RetCode errEnum);
 
-  static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int
scanBufSize, short hdfsIoByteArraySize, 
+  static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int
scanBufSize, int hdfsIoByteArraySizeInKB, 
             HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int
rangeTailIOSize,
             ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode);
 
-  HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
short hdfsIoByteArraySize, 
+  HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
int hdfsIoByteArraySizeInKB, 
             HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, 
             int rangeTailIOSize);
 
@@ -169,11 +169,11 @@ public:
   }
  
   ~HdfsClient();
-  static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode
&retCode, short hdfsIoByteArraySize = 0);
+  static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode
&retCode, int hdfsIoByteArraySizeInKB = 0);
   static HdfsClient *getInstance();
   static void deleteInstance();
-  void setIoByteArraySize(short size)
-      { ioByteArraySize_ = size; }
+  void setIoByteArraySize(int size)
+      { ioByteArraySizeInKB_ = size; }
 
   // Get the error description.
   static char* getErrorText(HDFS_Client_RetCode errEnum);
@@ -226,7 +226,7 @@ private:
   int numFiles_;
   char *path_;
   Int64 totalBytesWritten_;
-  short ioByteArraySize_;
+  Int32 ioByteArraySizeInKB_;
   ExHdfsScanStats *hdfsStats_;
   static jclass javaClass_;
   static JavaMethodInit* JavaMethods_;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/nskgmake/comexe/Makefile
----------------------------------------------------------------------
diff --git a/core/sql/nskgmake/comexe/Makefile b/core/sql/nskgmake/comexe/Makefile
index 6cc6c4e..4ae6e75 100755
--- a/core/sql/nskgmake/comexe/Makefile
+++ b/core/sql/nskgmake/comexe/Makefile
@@ -72,7 +72,8 @@ CPPSRC := CmpMessage.cpp \
 	udrtabledescinfo.cpp  \
 	ComTrace.cpp \
 	ComTdbCancel.cpp \
-	ComTdbHdfsScan.cpp
+	ComTdbHdfsScan.cpp \
+        ComCompressionInfo.cpp 
 
 
 CPPSRC += vers_libcomexe.cpp

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 5ffcd03..2f24dce 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -110,7 +110,7 @@ public class HDFSClient
    boolean compressed_ = false;
    private CompressionCodec codec_ = null;
    private short compressionType_;
-   private short ioByteArraySize_;
+   private int ioByteArraySizeInKB_;
    static {
       String confFile = System.getProperty("trafodion.log4j.configFile");
       System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
@@ -148,7 +148,7 @@ public class HDFSClient
          int bytesRead;
          int totalBytesRead = 0;
          if (compressed_) {
-            bufArray_ = new byte[ioByteArraySize_ * 1024];
+            bufArray_ = new byte[ioByteArraySizeInKB_ * 1024];
          } else 
          if (! buf_.hasArray()) {
             try {
@@ -223,13 +223,13 @@ public class HDFSClient
    // The passed in length can never be more than the size of the buffer
    // If the range has a length more than the buffer length, the range is chunked
    // in HdfsScan
-   public HDFSClient(int bufNo, short ioByteArraySize, int rangeNo, String filename, ByteBuffer
buffer, long position, 
+   public HDFSClient(int bufNo, int ioByteArraySizeInKB, int rangeNo, String filename, ByteBuffer
buffer, long position, 
                 int length, short compressionType, CompressionInputStream inStream) throws
IOException
    {
       bufNo_ = bufNo; 
       rangeNo_ = rangeNo;
       filename_ = filename;
-      ioByteArraySize_ = ioByteArraySize;
+      ioByteArraySizeInKB_ = ioByteArraySizeInKB;
       filepath_ = new Path(filename_);
       fs_ = FileSystem.get(filepath_.toUri(),config_);
       compressionType_ = compressionType;
@@ -384,7 +384,7 @@ public class HDFSClient
       int bufLen;
       int bufOffset = 0;
       if (compressed_ && bufArray_ != null) 
-         bufArray_ = new byte[ioByteArraySize_ * 1024];
+         bufArray_ = new byte[ioByteArraySizeInKB_ * 1024];
       if (buffer.hasArray())
          bufLen = buffer.array().length;
       else

http://git-wip-us.apache.org/repos/asf/trafodion/blob/d9ee71e1/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index b438fb2..48d5768 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -74,12 +74,11 @@ public class HdfsScan
    private int lastBufCompleted_ = -1;
    private boolean scanCompleted_;
    private CompressionInputStream currInStream_;
-   private short ioByteArraySize_;
+   private int ioByteArraySizeInKB_;
  
  
    // Structure to hold the Scan ranges for this HdfsScan instance
    //
-   
    class HdfsScanRange 
    {
       String filename_;
@@ -109,7 +108,7 @@ public class HdfsScan
    {
    }
 
-   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, short ioByteArraySize, String
filename[], long pos[], 
+   public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, int ioByteArraySizeInKB, String
filename[], long pos[], 
             long len[], int rangeNum[], short compressionType[]) throws IOException
    {
       // Two buffers to hold the data read
@@ -118,7 +117,7 @@ public class HdfsScan
 
       buf_[0] = buf1;
       buf_[1] = buf2;
-      ioByteArraySize_ = ioByteArraySize;
+      ioByteArraySizeInKB_ = ioByteArraySizeInKB;
       for (int i = 0; i < 2 ; i++) {
           if (buf_[i].hasArray())
              bufLen_[i] = buf_[i].array().length;
@@ -166,7 +165,7 @@ public class HdfsScan
       if (! scanCompleted_) {
          if (logger_.isDebugEnabled())
             logger_.debug(" CurrentRange " + hdfsScanRanges_[currRange_].tdbRangeNum_ + "
LenRemain " + currRangeLenRemain_ + " BufNo " + bufNo); 
-         hdfsClient_[bufNo] = new HDFSClient(bufNo, ioByteArraySize_, hdfsScanRanges_[currRange_].tdbRangeNum_,

+         hdfsClient_[bufNo] = new HDFSClient(bufNo, ioByteArraySizeInKB_, hdfsScanRanges_[currRange_].tdbRangeNum_,

 			hdfsScanRanges_[currRange_].filename_, 
                         buf_[bufNo], currRangePos_, readLength, 
                         hdfsScanRanges_[currRange_].compressionType_, currInStream_);


Mime
View raw message