trafodion-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dbirds...@apache.org
Subject [2/4] incubator-trafodion git commit: [TRAFODION-1752] JVM goes out-of-memory for hundreds of upsert statement in one session
Date Thu, 21 Jan 2016 01:02:24 GMT
[TRAFODION-1752] JVM goes out-of-memory for hundreds of upsert statement in one session

It was observed that there are more conditions wherein HTableClient object could be
leaked. Fixed the code in all those places too.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/b76c3435
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/b76c3435
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/b76c3435

Branch: refs/heads/master
Commit: b76c3435d1dc4286ebd7426dd55b8395da390854
Parents: d90dde7
Author: selvaganesang <selva.govindarajan@esgyn.com>
Authored: Tue Jan 19 06:10:06 2016 +0000
Committer: selvaganesang <selva.govindarajan@esgyn.com>
Committed: Tue Jan 19 06:10:06 2016 +0000

----------------------------------------------------------------------
 core/sql/executor/ExHbaseAccess.h  |  2 ++
 core/sql/executor/ExHbaseIUD.cpp   | 59 +++++++++++++++++----------------
 core/sql/exp/ExpHbaseInterface.cpp | 38 ++++++++++-----------
 3 files changed, 51 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b76c3435/core/sql/executor/ExHbaseAccess.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseAccess.h b/core/sql/executor/ExHbaseAccess.h
index 85c50be..093755e 100644
--- a/core/sql/executor/ExHbaseAccess.h
+++ b/core/sql/executor/ExHbaseAccess.h
@@ -845,6 +845,7 @@ public:
     , DONE
     , ALL_DONE
     , COMPLETE_ASYNC_INSERT
+    , DONE_WITH_CLOSE
 
   } step_;
 
@@ -1157,6 +1158,7 @@ public:
     , APPLY_PRED
     , RETURN_ROW
     , COMPLETE_ASYNC_OPERATION
+    , DONE_WITH_CLOSE
   } step_;
 
   ExHbaseAccessSQRowsetTcb( const ExHbaseAccessTdb &tdb,

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b76c3435/core/sql/executor/ExHbaseIUD.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseIUD.cpp b/core/sql/executor/ExHbaseIUD.cpp
index 1744251..86f131b 100644
--- a/core/sql/executor/ExHbaseIUD.cpp
+++ b/core/sql/executor/ExHbaseIUD.cpp
@@ -51,7 +51,7 @@ ExWorkProcRetcode ExHbaseAccessInsertTcb::work()
     {
       ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
       if (pentry_down->downState.request == ex_queue::GET_NOMORE)
-	step_ = DONE;
+	step_ = DONE_WITH_CLOSE;
 
       switch (step_)
 	{
@@ -201,16 +201,17 @@ ExWorkProcRetcode ExHbaseAccessInsertTcb::work()
 	  {
 	    if (handleError(rc))
 	      return rc;
-
-	    step_ = DONE;
+	    step_ = DONE_WITH_CLOSE;
 	  }
 	  break;
 
 	case DONE:
+        case DONE_WITH_CLOSE:
 	  {
+            if (step_ == DONE_WITH_CLOSE)
+               ehi_->close();
 	    if (handleDone(rc, matches_))
 	      return rc;
-
 	    step_ = NOT_STARTED;
 	  }
 	  break;
@@ -238,7 +239,7 @@ ExWorkProcRetcode ExHbaseAccessInsertRowwiseTcb::work()
     {
       ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
       if (pentry_down->downState.request == ex_queue::GET_NOMORE)
-	step_ = DONE;
+	step_ = DONE_WITH_CLOSE;
 
       switch (step_)
 	{
@@ -371,13 +372,15 @@ ExWorkProcRetcode ExHbaseAccessInsertRowwiseTcb::work()
 	  {
 	    if (handleError(rc))
 	      return rc;
-
-	    step_ = DONE;
+	    step_ = DONE_WITH_CLOSE;
 	  }
 	  break;
 
 	case DONE:
+        case DONE_WITH_CLOSE:
 	  {
+            if (step_ == DONE_WITH_CLOSE)
+               ehi_->close();
 	    if (handleDone(rc, matches_))
 	      return rc;
 
@@ -408,7 +411,7 @@ ExWorkProcRetcode ExHbaseAccessInsertSQTcb::work()
     {
       ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
       if (pentry_down->downState.request == ex_queue::GET_NOMORE)
-	step_ = DONE;
+	step_ = DONE_WITH_CLOSE;
 
       switch (step_)
 	{
@@ -698,15 +701,15 @@ ExWorkProcRetcode ExHbaseAccessInsertSQTcb::work()
 	  {
 	    if (handleError(rc))
 	      return rc;
-
-	    retcode = ehi_->close();
-
-	    step_ = DONE;
+	    step_ = DONE_WITH_CLOSE;
 	  }
 	  break;
 
 	case DONE:
+        case DONE_WITH_CLOSE:
 	  {
+            if (step_ == DONE_WITH_CLOSE)
+               ehi_->close();
 	    if (NOT hbaseAccessTdb().computeRowsAffected())
 	      matches_ = 0;
 
@@ -754,19 +757,19 @@ ExWorkProcRetcode ExHbaseAccessUpsertVsbbSQTcb::work()
 
       ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
       if (pentry_down->downState.request == ex_queue::GET_NOMORE)
-	step_ = ALL_DONE;
+	step_ = DONE_WITH_CLOSE;
      else if (pentry_down->downState.request == ex_queue::GET_EOD)
           if (currRowNum_ > rowsInserted_)
 	{
 	  step_ = PROCESS_INSERT_FLUSH_AND_CLOSE;
 
 	}
-          else
-          {
+        else
+        {
             if (lastHandledStep_ == ALL_DONE)
                matches_=0;
             step_ = ALL_DONE;
-          }
+        }
       switch (step_)
 	{
 	case NOT_STARTED:
@@ -1007,10 +1010,7 @@ ExWorkProcRetcode ExHbaseAccessUpsertVsbbSQTcb::work()
 	  {
 	    if (handleError(rc))
 	      return rc;
-
-	    retcode = ehi_->close();
-
-	    step_ = ALL_DONE;
+	    step_ = DONE_WITH_CLOSE;
 	  }
 	  break;
 
@@ -1029,8 +1029,11 @@ ExWorkProcRetcode ExHbaseAccessUpsertVsbbSQTcb::work()
 	  break;
 
 	case DONE:
+        case DONE_WITH_CLOSE:
 	case ALL_DONE:
 	  {
+            if (step_ == DONE_WITH_CLOSE)
+               ehi_->close();
 	    if (NOT hbaseAccessTdb().computeRowsAffected())
 	      matches_ = 0;
 
@@ -1056,7 +1059,7 @@ ExWorkProcRetcode ExHbaseAccessUpsertVsbbSQTcb::work()
 		return WORK_CALL_AGAIN;
 	      }
 
-	    if (handleDone(rc, (step_ == ALL_DONE ? matches_ : 0)))
+	    if (handleDone(rc, (step_ == ALL_DONE  ? matches_ : 0)))
 	      return rc;
 	    lastHandledStep_ = step_;
 
@@ -3865,7 +3868,7 @@ ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work()
 
       ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
       if (pentry_down->downState.request == ex_queue::GET_NOMORE)
-	step_ = ALL_DONE;
+	step_ = DONE_WITH_CLOSE;
       else if (pentry_down->downState.request == ex_queue::GET_EOD) {
          if (numRowsInDirectBuffer() > 0) {
             if (hbaseAccessTdb().getAccessType() == ComTdbHbaseAccess::UPDATE_)
@@ -4274,9 +4277,7 @@ ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work()
 	  {
 	    if (handleError(rc))
 	      return rc;
-
-	    retcode = ehi_->close();
-	    step_ = ALL_DONE;
+	    step_ = DONE_WITH_CLOSE;
 	  }
 	  break;
         case ROW_DONE:
@@ -4287,8 +4288,11 @@ ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work()
           }
           break;
 	case DONE:
+        case DONE_WITH_CLOSE:
 	case ALL_DONE:
 	  {
+            if (step_ == DONE_WITH_CLOSE)
+               ehi_->close();
 	    if (NOT hbaseAccessTdb().computeRowsAffected())
 	      matches_ = 0;
 
@@ -4324,11 +4328,8 @@ ExWorkProcRetcode ExHbaseAccessSQRowsetTcb::work()
 
 	    if (step_ == DONE)
 	       step_ = SETUP_UMD;
-	    else { 
-               // It is ok to call close more than once
-               ehi_->close();
+	    else  
 	       step_ = NOT_STARTED;
-            }
 	  }
 	  break;
 	} // switch

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/b76c3435/core/sql/exp/ExpHbaseInterface.cpp
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpHbaseInterface.cpp b/core/sql/exp/ExpHbaseInterface.cpp
index 8a43330..9bea32b 100644
--- a/core/sql/exp/ExpHbaseInterface.cpp
+++ b/core/sql/exp/ExpHbaseInterface.cpp
@@ -394,7 +394,10 @@ Lng32 ExpHbaseInterface_JNI::cleanup()
       client_->releaseHTableClient(htc_);
       htc_ = NULL;    
     }
-
+    if (asyncHtc_) {
+       client_->releaseHTableClient(asyncHtc_);
+       asyncHtc_ = NULL;
+    }
     if (hblc_)
     {
       client_->releaseHBulkLoadClient(hblc_);
@@ -739,11 +742,11 @@ Lng32 ExpHbaseInterface_JNI::deleteRow(
     transID = getTransactionIDFromContext();
   retCode_ = client_->deleteRow((NAHeap *)heap_, tblName.val, hbs_, useTRex_, transID,
row, columns, timestamp, asyncOperation, &htc);
   if (retCode_ != HBC_OK) {
-    asyncHtc_ = NULL;
     return -HBASE_ACCESS_ERROR;
   }
   else {
-    asyncHtc_ = htc;
+    if (asyncOperation)
+       asyncHtc_ = htc;
     return HBASE_ACCESS_SUCCESS;
   } 
 }
@@ -766,11 +769,11 @@ Lng32 ExpHbaseInterface_JNI::deleteRows(
     transID = getTransactionIDFromContext();
   retCode_ = client_->deleteRows((NAHeap *)heap_, tblName.val, hbs_, useTRex_, transID,
rowIDLen, rowIDs,timestamp, asyncOperation, &htc);
   if (retCode_ != HBC_OK) {
-    asyncHtc_ = NULL;
     return -HBASE_ACCESS_ERROR;
   }
   else {
-    asyncHtc_ = htc;
+    if (asyncOperation)
+       asyncHtc_ = htc;
     return HBASE_ACCESS_SUCCESS;
   } 
 }
@@ -795,15 +798,14 @@ Lng32 ExpHbaseInterface_JNI::checkAndDeleteRow(
   retCode_ = client_->checkAndDeleteRow((NAHeap *)heap_, tblName.val, hbs_, useTRex_,
transID, rowID, columnToCheck, 
                      columnValToCheck,timestamp, asyncOperation, &htc);
   if (retCode_ == HBC_ERROR_CHECKANDDELETEROW_NOTFOUND) {
-    asyncHtc_ = NULL;
     return HBASE_ROW_NOTFOUND_ERROR;
   } else
   if (retCode_ != HBC_OK) {
-    asyncHtc_ = NULL;
     return -HBASE_ACCESS_ERROR;
   }
   else {
-    asyncHtc_ = htc;
+    if (asyncOperation)
+       asyncHtc_ = htc;
     return HBASE_ACCESS_SUCCESS;
   } 
 }
@@ -828,11 +830,11 @@ Lng32 ExpHbaseInterface_JNI::insertRow(
   retCode_ = client_->insertRow((NAHeap *)heap_, tblName.val, hbs_,
                       useTRex_, transID, rowID, row, timestamp, checkAndPut, asyncOperation,
&htc);
   if (retCode_ != HBC_OK) {
-    asyncHtc_ = NULL;
     return -HBASE_ACCESS_ERROR;
   }
   else {
-    asyncHtc_ = htc;
+    if (asyncOperation)
+       asyncHtc_ = htc;
     return HBASE_ACCESS_SUCCESS;
   }
 }
@@ -858,11 +860,11 @@ Lng32 ExpHbaseInterface_JNI::insertRows(
   retCode_ = client_->insertRows((NAHeap *)heap_, tblName.val, hbs_,
                       useTRex_, transID, rowIDLen, rowIDs, rows, timestamp, autoFlush, asyncOperation,
&htc);
   if (retCode_ != HBC_OK) {
-    asyncHtc_ = NULL;
     return -HBASE_ACCESS_ERROR;
   }
   else {
-    asyncHtc_ = htc;
+    if (asyncOperation)
+       asyncHtc_ = htc;
     return HBASE_ACCESS_SUCCESS;
   } 
 }
@@ -1180,7 +1182,7 @@ Lng32 ExpHbaseInterface_JNI::checkAndInsertRow(
 	  const int64_t timestamp,
           NABoolean asyncOperation)
 {
-  HTableClient_JNI *htc;
+  HTableClient_JNI *htc = NULL;
   Int64 transID; 
   NABoolean checkAndPut = TRUE;
 
@@ -1192,16 +1194,15 @@ Lng32 ExpHbaseInterface_JNI::checkAndInsertRow(
                       useTRex_, transID, rowID, row, timestamp, checkAndPut, asyncOperation,
&htc);
 
   if (retCode_ == HBC_ERROR_INSERTROW_DUP_ROWID) {
-     asyncHtc_ = htc; 
      return HBASE_DUP_ROW_ERROR;
   }
   else 
   if (retCode_ != HBC_OK) {
-    asyncHtc_ = NULL;
     return -HBASE_ACCESS_ERROR;
   }
   else {
-    asyncHtc_ = htc; 
+    if (asyncOperation)
+        asyncHtc_ = htc;
     return HBASE_ACCESS_SUCCESS;
   }
 }
@@ -1228,14 +1229,13 @@ Lng32 ExpHbaseInterface_JNI::checkAndUpdateRow(
                       useTRex_, transID, rowID, row, columnToCheck, colValToCheck, timestamp,
asyncOperation, &htc);
 
   if (retCode_  == HBC_ERROR_CHECKANDUPDATEROW_NOTFOUND) {
-     asyncHtc_ = htc; 
      return HBASE_ROW_NOTFOUND_ERROR;
   } else 
   if (retCode_ != HBC_OK) {
-    asyncHtc_ = NULL;
     return -HBASE_ACCESS_ERROR;
   } else {
-    asyncHtc_ = htc; 
+    if (asyncOperation)
+       asyncHtc_ = htc; 
     return HBASE_ACCESS_SUCCESS;
   }
 }


Mime
View raw message