trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dbirds...@apache.org
Subject [2/3] incubator-trafodion git commit: [TRAFODION-9] Non-blocking hbase operation to smoothen the data flow in trafodion engine
Date Fri, 11 Sep 2015 17:59:41 GMT
[TRAFODION-9] Non-blocking hbase operation to smoothen the data flow in trafodion engine

Changed the operator steps to COMPLETE_ASYNC_...
Changed the queue length of rowset operator to be based on the HBASE_ROWSET_VSBB_SIZE CQD.
Changes to report "Actual Rows Used" in the get statistics for qid <qid> default correctly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/c66deca7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/c66deca7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/c66deca7

Branch: refs/heads/master
Commit: c66deca7206f5442eaa31db822797b05ce58b42d
Parents: f6dba67
Author: selvaganesang <selva.govindarajan@esgyn.com>
Authored: Fri Sep 11 05:17:48 2015 +0000
Committer: selvaganesang <selva.govindarajan@esgyn.com>
Committed: Fri Sep 11 05:17:48 2015 +0000

----------------------------------------------------------------------
 core/sql/executor/ExHbaseAccess.h   |  4 +--
 core/sql/executor/ExHbaseIUD.cpp    | 62 +++++++++++++++-----------------
 core/sql/generator/GenRelUpdate.cpp | 12 +++++--
 core/sql/sqlcomp/nadefaults.cpp     |  2 +-
 4 files changed, 41 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c66deca7/core/sql/executor/ExHbaseAccess.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseAccess.h b/core/sql/executor/ExHbaseAccess.h
index 878e5e3..e6688d2 100644
--- a/core/sql/executor/ExHbaseAccess.h
+++ b/core/sql/executor/ExHbaseAccess.h
@@ -836,7 +836,7 @@ public:
     , HANDLE_ERROR
     , DONE
     , ALL_DONE
-    , ASYNC_INSERT_COMPLETE
+    , COMPLETE_ASYNC_INSERT
 
   } step_;
 
@@ -1145,7 +1145,7 @@ public:
     , CREATE_ROW
     , APPLY_PRED
     , RETURN_ROW
-    , ASYNC_OPERATION_COMPLETE
+    , COMPLETE_ASYNC_OPERATION
   } step_;
 
   ExHbaseAccessSQRowsetTcb( const ExHbaseAccessTdb &tdb,

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c66deca7/core/sql/executor/ExHbaseIUD.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseIUD.cpp b/core/sql/executor/ExHbaseIUD.cpp
index 4766d1e..5b1a3db 100644
--- a/core/sql/executor/ExHbaseIUD.cpp
+++ b/core/sql/executor/ExHbaseIUD.cpp
@@ -544,6 +544,8 @@ ExWorkProcRetcode ExHbaseAccessInsertSQTcb::work()
 		break;
 	      }
 	    
+	    if (getHbaseAccessStats())
+	      getHbaseAccessStats()->incUsedRows();
 
 	    if (hbaseAccessTdb().returnRow()) {
 		step_ = RETURN_ROW;
@@ -551,18 +553,15 @@ ExWorkProcRetcode ExHbaseAccessInsertSQTcb::work()
             }
 	    matches_++;
 	    if (asyncOperation_) {
-                step_ = ASYNC_INSERT_COMPLETE;
+                step_ = COMPLETE_ASYNC_INSERT;
                 return WORK_CALL_AGAIN;
             }
             else {
 	        step_ = INSERT_CLOSE;
 	    }
-	    if (getHbaseAccessStats())
-	      getHbaseAccessStats()->incUsedRows();
-	        
 	  }
 	  break;
-        case ASYNC_INSERT_COMPLETE:
+        case COMPLETE_ASYNC_INSERT:
           {
             if (resultArray_  == NULL)
                 resultArray_ = new (getHeap()) NABoolean[1];
@@ -591,8 +590,6 @@ ExWorkProcRetcode ExHbaseAccessInsertSQTcb::work()
 		step_ = HANDLE_ERROR;
 		break;
             }
-	    if (getHbaseAccessStats())
-	      getHbaseAccessStats()->incUsedRows();
 	    step_ = INSERT_CLOSE;
           }
           break;
@@ -612,20 +609,20 @@ ExWorkProcRetcode ExHbaseAccessInsertSQTcb::work()
 		step_ = HANDLE_ERROR;
 		break;
 	    }
+	    if (getHbaseAccessStats())
+	      getHbaseAccessStats()->incUsedRows();
 	    if (hbaseAccessTdb().returnRow()) {
 		step_ = RETURN_ROW;
 		break;
 	    }
 	    matches_++;
 	    if (asyncOperation_) {
-                step_ = ASYNC_INSERT_COMPLETE;
+                step_ = COMPLETE_ASYNC_INSERT;
                 return WORK_CALL_AGAIN;
             }
             else {
 	        step_ = INSERT_CLOSE;
             }
-	    if (getHbaseAccessStats())
-	      getHbaseAccessStats()->incUsedRows();
 	  }
 	  break;
 
@@ -670,7 +667,7 @@ ExWorkProcRetcode ExHbaseAccessInsertSQTcb::work()
 		  return 1;
 	      }
 	    if (asyncOperation_) {
-               step_ = ASYNC_INSERT_COMPLETE;
+               step_ = COMPLETE_ASYNC_INSERT;
                return WORK_CALL_AGAIN;
             }
             else
@@ -925,22 +922,20 @@ ExWorkProcRetcode ExHbaseAccessUpsertVsbbSQTcb::work()
                                        hbaseAccessTdb().getIsTrafLoadAutoFlush(), 
 				       asyncOperation_);
 
-	    if (setupError(retcode, "ExpHbaseInterface::insertRows"))
-	      {
+	    if (setupError(retcode, "ExpHbaseInterface::insertRows")) {
 		step_ = HANDLE_ERROR;
 		break;
-	      }
-            if (asyncOperation_) {
-		lastHandledStep_ = step_;
-                step_ = ASYNC_INSERT_COMPLETE;
-                break;
-            }
+	    }
 	    if (getHbaseAccessStats()) {
 		getHbaseAccessStats()->lobStats()->numReadReqs++;
 		getHbaseAccessStats()->incUsedRows(numRowsInVsbbBuffer_);
-	      }
+	    }
             rowsInserted_ += numRowsInVsbbBuffer_; 
-	    if (step_ == PROCESS_INSERT_FLUSH_AND_CLOSE)
+            if (asyncOperation_) {
+		lastHandledStep_ = step_;
+                step_ = COMPLETE_ASYNC_INSERT;
+            }
+            else if (step_ == PROCESS_INSERT_FLUSH_AND_CLOSE)
 	      step_ = FLUSH_BUFFERS;
 	    else if (step_ == PROCESS_INSERT_AND_CLOSE)
 	      step_ = INSERT_CLOSE;
@@ -948,7 +943,7 @@ ExWorkProcRetcode ExHbaseAccessUpsertVsbbSQTcb::work()
               step_ = ALL_DONE;
 	  }
 	  break;
-        case ASYNC_INSERT_COMPLETE:
+        case COMPLETE_ASYNC_INSERT:
           {
             if (resultArray_  == NULL)
                 resultArray_ = new (getHeap()) NABoolean[hbaseAccessTdb().getHbaseRowsetVsbbSize()];
@@ -981,7 +976,6 @@ ExWorkProcRetcode ExHbaseAccessUpsertVsbbSQTcb::work()
             }
             if (step_ == HANDLE_ERROR)
                break;
-            rowsInserted_ += numRowsInVsbbBuffer_;
             if (lastHandledStep_ == PROCESS_INSERT_FLUSH_AND_CLOSE)
               step_ = FLUSH_BUFFERS;
             else if (lastHandledStep_ == PROCESS_INSERT_AND_CLOSE)
@@ -1525,7 +1519,7 @@ ExWorkProcRetcode ExHbaseAccessBulkLoadPrepSQTcb::work()
 
       case PROCESS_INSERT:
       {
-        short numRowsInBuffer = patchDirectRowBuffers();
+        numRowsInVsbbBuffer_ = patchDirectRowBuffers();
         retcode = ehi_->addToHFile(hbaseAccessTdb().getRowIDLen(),
                                    rowIDs_,
                                    rows_);
@@ -1535,12 +1529,12 @@ ExWorkProcRetcode ExHbaseAccessBulkLoadPrepSQTcb::work()
            step_ = HANDLE_ERROR;
            break;
         }
-        rowsInserted_ += numRowsInBuffer;
+        rowsInserted_ += numRowsInVsbbBuffer_;
 
         if (getHbaseAccessStats())
         {
           getHbaseAccessStats()->lobStats()->numReadReqs++;
-          getHbaseAccessStats()->incUsedRows(numRowsInBuffer);
+          getHbaseAccessStats()->incUsedRows(numRowsInVsbbBuffer_);
         }
 
         step_ = ALL_DONE;
@@ -4068,7 +4062,7 @@ ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work()
 	case PROCESS_DELETE:
 	case PROCESS_DELETE_AND_CLOSE:
 	  {
-            short numRowsInBuffer = patchDirectRowIDBuffers();
+            numRowsInVsbbBuffer_ = patchDirectRowIDBuffers();
 	    retcode = ehi_->deleteRows(table_,
                                        hbaseAccessTdb().getRowIDLen(),
                                        rowIDs_,
@@ -4083,12 +4077,12 @@ ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work()
 	      }
             if (asyncOperation_) {
                lastHandledStep_ = step_;
-               step_ = ASYNC_OPERATION_COMPLETE;
+               step_ = COMPLETE_ASYNC_OPERATION;
                break;
             }
             if (getHbaseAccessStats()) {
 	      getHbaseAccessStats()->lobStats()->numReadReqs++;
-	      getHbaseAccessStats()->incUsedRows(numRowsInBuffer);
+	      getHbaseAccessStats()->incUsedRows(numRowsInVsbbBuffer_);
 	    }
 	    if (step_ == PROCESS_DELETE_AND_CLOSE)
 	      step_ = RS_CLOSE;
@@ -4100,9 +4094,9 @@ ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work()
 	case PROCESS_SELECT:
 	  {
            if (numRowsInDirectBuffer() > 0) {
-              short numRowsInBuffer = patchDirectRowIDBuffers();
-	      retcode = ehi_->getRowsOpen(
-                            table_,
+		      numRowsInVsbbBuffer_ = patchDirectRowIDBuffers();
+		      retcode = ehi_->getRowsOpen(
+				    table_,
                             hbaseAccessTdb().getRowIDLen(),
                             rowIDs_, 
                             columns_);
@@ -4175,7 +4169,7 @@ ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work()
 	      }
             if (asyncOperation_) {
                lastHandledStep_ = step_;
-               step_ = ASYNC_OPERATION_COMPLETE;
+               step_ = COMPLETE_ASYNC_OPERATION;
                break;
             }
             if (getHbaseAccessStats()) {
@@ -4188,7 +4182,7 @@ ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work()
 	      step_ = DONE;
 	  }
 	  break;
-      case ASYNC_OPERATION_COMPLETE:
+      case COMPLETE_ASYNC_OPERATION:
          {
             if (resultArray_  == NULL)
                 resultArray_ = new (getHeap()) NABoolean[hbaseAccessTdb().getHbaseRowsetVsbbSize()];

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c66deca7/core/sql/generator/GenRelUpdate.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenRelUpdate.cpp b/core/sql/generator/GenRelUpdate.cpp
index 081f67f..cdba067 100644
--- a/core/sql/generator/GenRelUpdate.cpp
+++ b/core/sql/generator/GenRelUpdate.cpp
@@ -2658,8 +2658,16 @@ short HbaseInsert::codeGen(Generator *generator)
 
   if (getInsertType() == Insert::VSBB_INSERT_USER &&
               generator->oltOptInfo()->multipleRowsReturned())
-    downqueuelength = 400;
-
+  {
+    downqueuelength = getDefault(HBASE_ROWSET_VSBB_SIZE);
+    queue_index dq = 1;
+    queue_index bits = downqueuelength;
+    while (bits && dq < downqueuelength) {
+        bits = bits  >> 1;
+        dq = dq << 1;
+    }
+    downqueuelength = dq;
+  }
   char * tablename = NULL;
   if ((getTableDesc()->getNATable()->isHbaseRowTable()) ||
       (getTableDesc()->getNATable()->isHbaseCellTable()))

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/c66deca7/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index 44375cb..72929ec 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -1770,7 +1770,7 @@ SDDkwd__(EXE_DIAGNOSTIC_EVENTS,		"OFF"),
  DDui___(HBASE_REGION_SERVER_MAX_HEAP_SIZE,     "1024"), // in units of MB
 
   DDkwd__(HBASE_ROWSET_VSBB_OPT,		"ON"),
-  DDusht_(HBASE_ROWSET_VSBB_SIZE,        	"1000"),
+  DDusht_(HBASE_ROWSET_VSBB_SIZE,        	"1024"),
   DDflt0_(HBASE_SALTED_TABLE_MAX_FILE_SIZE,	"0"),
   DDkwd__(HBASE_SALTED_TABLE_SET_SPLIT_POLICY,	"ON"),
   DD_____(HBASE_SCHEMA,                         "HBASE"),


Mime
View raw message