drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject [18/27] drill git commit: DRILL-5221: Send cancel message as soon as possible in C++ connector
Date Thu, 02 Mar 2017 20:59:45 GMT
DRILL-5221: Send cancel message as soon as possible in C++ connector

In C++ connector, try to send cancel request to the server as soon as
possible, which means when receiving the queryId or when requested by the
user if queryId has already been received.

close #733


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/20a374c5
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/20a374c5
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/20a374c5

Branch: refs/heads/master
Commit: 20a374c5bfabb49f4c8b3144a5a8529d17ee03fd
Parents: c81f588
Author: Laurent Goujon <laurent@dremio.com>
Authored: Tue Jan 24 18:47:47 2017 -0800
Committer: Jinfeng Ni <jni@apache.org>
Committed: Wed Mar 1 23:15:33 2017 -0800

----------------------------------------------------------------------
 .../client/src/clientlib/drillClientImpl.cpp    | 48 +++++++++++---------
 .../client/src/clientlib/drillClientImpl.hpp    |  2 +-
 2 files changed, 27 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/20a374c5/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 417fe80..ce3ab63 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -1012,7 +1012,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
c
             DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query cancellation " <<
std::endl;)
         	delete qr;
         	delete allocatedBuffer;
-        	ret =  QRY_CANCEL;
+        	ret =  QRY_CANCELED;
         } else {
         	//Validate the RPC message
         	std::string valErr;
@@ -1689,7 +1689,6 @@ status_t DrillClientImpl::handleQryCancellation(status_t status, DrillClientQuer
 	pQueryHandle->setIsQueryPending(false);
 	DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
 	pQueryHandle->setQueryStatus(status);
-	removeQueryResult(pQueryHandle);
 	removeQueryHandle(pQueryHandle);
 	return status;
 }
@@ -1732,25 +1731,23 @@ status_t DrillClientImpl::handleTerminatedQryState(
 
 void DrillClientImpl::removeQueryHandle(DrillClientQueryHandle* pQueryHandle){
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    if(!m_queryHandles.empty()){
-        for(std::map<int, DrillClientQueryHandle*>::const_iterator iter=m_queryHandles.begin();
iter!=m_queryHandles.end(); iter++) {
-            if(pQueryHandle==(DrillClientQueryHandle*)iter->second){
-                m_queryHandles.erase(iter->first);
-                break;
-            }
-        }
+    // Removing first the base handle
+    for(std::map<int, DrillClientQueryHandle*>::const_iterator iter=m_queryHandles.begin();
iter!=m_queryHandles.end(); iter++) {
+    	if(pQueryHandle==(DrillClientQueryHandle*)iter->second){
+    		m_queryHandles.erase(iter->first);
+    		break;
+    	}
     }
-}
 
-void DrillClientImpl::removeQueryResult(DrillClientQueryResult* pQueryResult){
-    boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    if(!m_queryResults.empty()){
-        for(std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::const_iterator
it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
-            if(pQueryResult==(DrillClientQueryResult*)it->second){
-                m_queryResults.erase(it->first);
-                break;
-            }
-        }
+    // if the query handle is a result handle, m_queryResults also need to be cleaned.
+    DrillClientQueryResult* pQueryResult = dynamic_cast<DrillClientQueryResult*>(pQueryHandle);
+    if (pQueryResult) {
+    	for(std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::const_iterator
it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
+    		if(pQueryResult==(DrillClientQueryResult*)it->second){
+    			m_queryResults.erase(it->first);
+    			break;
+    		}
+    	}
     }
 }
 
@@ -1949,6 +1946,16 @@ RecordBatch*  DrillClientQueryResult::peekNext(){
     return pRecordBatch;
 }
 
+void DrillClientQueryResult::cancel() {
+	// Calling parent class
+	DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>::cancel();
+
+	// If queryId has already been received, don't wait to send the
+	// cancellation message
+	if (this->m_pQueryId) {
+		this->client().handleQryCancellation(QRY_CANCELED, this);
+	}
+}
 RecordBatch*  DrillClientQueryResult::getNext() {
     RecordBatch* pRecordBatch=NULL;
     boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
@@ -2073,9 +2080,6 @@ void DrillClientQueryResult::clearAndDestroy(){
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " <<
debugPrintQid(*this->m_pQueryId) << std::endl;)
     }
 
-    //Tell the parent to remove this from its lists
-    this->client().removeQueryResult(this);
-
     //clear query id map entries.
     if(this->m_pQueryId!=NULL){
         delete this->m_pQueryId; this->m_pQueryId=NULL;

http://git-wip-us.apache.org/repos/asf/drill/blob/20a374c5/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 5eb850d..bc6503d 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -210,6 +210,7 @@ class DrillClientQueryResult: public DrillClientBaseHandle<pfnQueryResultsListen
         m_pSchemaListener=l;
     }
 
+    void cancel();
     // Synchronous call to get data. Caller assumes ownership of the record batch
     // returned and it is assumed to have been consumed.
     RecordBatch*  getNext();
@@ -519,7 +520,6 @@ class DrillClientImpl : public DrillClientImplBase{
                 DrillClientQueryResult* pQueryResult);
         void broadcastError(DrillClientError* pErr);
         void removeQueryHandle(DrillClientQueryHandle* pQueryHandle);
-        void removeQueryResult(DrillClientQueryResult* pQueryResult);
         void sendAck(const rpc::InBoundRpcMessage& msg, bool isOk);
         void sendCancel(const exec::shared::QueryId* pQueryId);
 


Mime
View raw message