drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [15/15] drill git commit: DRILL-1996: Add cancel method to Drill C++ connector
Date Tue, 01 Nov 2016 20:30:00 GMT
DRILL-1996: Add cancel method to Drill C++ connector

This closes #602


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/83513daf
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/83513daf
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/83513daf

Branch: refs/heads/master
Commit: 83513daf0903e0d94fcaad7b1ae4e8ad6272b494
Parents: 166c4ce
Author: Laurent Goujon <laurent@dremio.com>
Authored: Tue Oct 11 16:35:18 2016 -0700
Committer: Parth Chandra <parthc@apache.org>
Committed: Tue Nov 1 11:33:23 2016 -0700

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    |   9 +-
 .../native/client/src/clientlib/drillClient.cpp |   8 +
 .../client/src/clientlib/drillClientImpl.cpp    | 174 +++++++++++--------
 .../client/src/clientlib/drillClientImpl.hpp    |   1 +
 .../client/src/include/drill/drillClient.hpp    |   7 +
 5 files changed, 124 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 306db56..2eeaf35 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -415,7 +415,14 @@ int main(int argc, char* argv[]) {
                     client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, &qryHandle);
                     client.registerSchemaChangeListener(&qryHandle, SchemaListener);
                     
-                    client.waitForResults();
+                     if(bTestCancel) {
+                        // Send cancellation request after 5seconds
+                        boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+                        std::cout<< "\n Cancelling query: " << *queryInpIter
<< "\n" << std::endl;
+                        client.cancelQuery(qryHandle);
+                    } else {
+                        client.waitForResults();
+                    }
 
                     client.freeQueryResources(&qryHandle);
                 }

http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 20a466e..b02f993 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -400,6 +400,14 @@ status_t DrillClient::executeQuery(const PreparedStatement& pstmt,
pfnQueryResul
 	return QRY_SUCCESS;
 }
 
+void DrillClient::cancelQuery(QueryHandle_t handle) {
+	if (!handle) {
+		return;
+	}
+	DrillClientQueryHandle* pHandle = static_cast<DrillClientQueryHandle*>(handle);
+	pHandle->cancel();
+}
+
 void* DrillClient::getApplicationContext(QueryHandle_t handle){
     assert(handle!=NULL);
     return (static_cast<DrillClientQueryHandle*>(handle))->getApplicationContext();

http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 7ecf910..51ae1a2 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -825,7 +825,6 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
 status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage&
msg ){
     DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
-    ::exec::shared::QueryId qid;
     // Be a good client and send ack as early as possible.
     // Drillbit pushed the query result to the client, the client should send ack
     // whenever it receives the message
@@ -839,7 +838,7 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
c
         qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
 
-        qid = ::exec::shared::QueryId(qr->query_id());
+        const ::exec::shared::QueryId& qid = qr->query_id();
         if(qid.part1()==0){
             DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData:
QID=0. Ignore and return QRY_SUCCESS." << std::endl;)
             delete allocatedBuffer;
@@ -855,90 +854,105 @@ status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer,
c
             return ret;
         }
 
-        //Validate the RPC message
-        std::string valErr;
-        if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
-            delete allocatedBuffer;
-            delete qr;
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData:
ERR_QRY_INVRPC.\n";)
-            pDrillClientQueryResult->setQueryStatus(ret);
-            return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
-        }
-
-        //Build Record Batch here
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id -
" << debugPrintQid(qr->query_id()) << std::endl;)
-
-        pRecordBatch= new RecordBatch(qr, allocatedBuffer,  msg.m_dbody);
-        pDrillClientQueryResult->m_numBatches++;
-
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." <<
(void*)pRecordBatch << std::endl;)
-        pRecordBatch->build();
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numRecords
"
-            << pRecordBatch->getNumRecords()  << std::endl;)
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qr->query_id())<<"recordBatch.numFields
"
-            << pRecordBatch->getNumFields()  << std::endl;)
-
-        ret=pDrillClientQueryResult->setupColumnDefs(qr);
-        if(ret==QRY_SUCCESS_WITH_INFO){
-            pRecordBatch->schemaChanged(true);
-        }
-
-        pDrillClientQueryResult->setIsQueryPending(true);
-        if(pDrillClientQueryResult->m_bIsLastChunk){
-            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
-                <<  "Received last batch. " << std::endl;)
-            ret=QRY_NO_MORE_DATA;
+        // check if query has been cancelled
+        if (pDrillClientQueryResult->isCancelled()) {
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query cancellation " <<
std::endl;)
+        	delete qr;
+        	delete allocatedBuffer;
+        	ret =  QRY_CANCEL;
+        } else {
+        	//Validate the RPC message
+        	std::string valErr;
+        	if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
+        		delete allocatedBuffer;
+        		delete qr;
+        		DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData:
ERR_QRY_INVRPC.\n";)
+        		pDrillClientQueryResult->setQueryStatus(ret);
+        		return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
+        	}
+
+        	//Build Record Batch here
+        	DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id -
" << debugPrintQid(qid) << std::endl;)
+
+        	pRecordBatch= new RecordBatch(qr, allocatedBuffer,  msg.m_dbody);
+        	pDrillClientQueryResult->m_numBatches++;
+
+        	DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." <<
(void*)pRecordBatch << std::endl;)
+        	pRecordBatch->build();
+        	DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numRecords
"
+        			<< pRecordBatch->getNumRecords()  << std::endl;)
+        	DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numFields
"
+        			<< pRecordBatch->getNumFields()  << std::endl;)
+
+					ret=pDrillClientQueryResult->setupColumnDefs(qr);
+        	if(ret==QRY_SUCCESS_WITH_INFO){
+        		pRecordBatch->schemaChanged(true);
+        	}
+
+        	pDrillClientQueryResult->setIsQueryPending(true);
+        	if(pDrillClientQueryResult->m_bIsLastChunk){
+        		DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)
+        				<<  "Received last batch. " << std::endl;)
+            		ret=QRY_NO_MORE_DATA;
+        	}
+        	pDrillClientQueryResult->setQueryStatus(ret);
+        	ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL);
         }
-        pDrillClientQueryResult->setQueryStatus(ret);
-        ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL);
     } // release lock
-    if(ret==QRY_FAILURE){
-        sendCancel(&qid);
-        // Do not decrement pending requests here. We have sent a cancel and we may still
receive results that are
-        // pushed on the wire before the cancel is processed.
-        pDrillClientQueryResult->setIsQueryPending(false);
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." <<
std::endl;)
-        pDrillClientQueryResult->setQueryStatus(ret);
-        removeQueryHandle(pDrillClientQueryResult);
-        removeQueryResult(pDrillClientQueryResult);
-        return ret;
+    if((ret==QRY_FAILURE || ret==QRY_CANCELED) && pDrillClientQueryResult != NULL){
+        return handleQryCancellation(ret, pDrillClientQueryResult);
     }
     return ret;
 }
 
 status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage&
msg ){
     DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination
id:" << msg.m_coord_id << std::endl;)
+	DrillClientQueryResult* pDrillClientQueryResult=NULL;
     status_t ret=QRY_SUCCESS;
 
     // make sure to deallocate buffer
     boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
-    boost::lock_guard<boost::mutex> lock(m_dcMutex);
-    for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator
it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){
-        DrillClientQueryResult* pDrillClientQueryResult=it->second;
-        std::string qidString = (pDrillClientQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pDrillClientQueryResult->m_pQueryId):std::string("NULL");
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds:
coordinationId: " << pDrillClientQueryResult->m_coordinationId
-        << " QueryId: "<< qidString << std::endl;)
-    }
-    if(msg.m_coord_id==0){
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0.
Ignore and return QRY_SUCCESS." << std::endl;)
-        return QRY_SUCCESS;
-    }
-    std::map<int, DrillClientQueryHandle*>::const_iterator it;
-    it=this->m_queryHandles.find(msg.m_coord_id);
-    if(it!=this->m_queryHandles.end()){
-        DrillClientQueryResult* pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second);
-        if (!pDrillClientQueryResult) {
-            return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
-        }
-        exec::shared::QueryId *qid = new exec::shared::QueryId;
-        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received Query Handle " << msg.m_pbody.size()
<< std::endl;)
-        qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid)
<< std::endl;)
-        m_queryResults[qid]=pDrillClientQueryResult;
-        //save queryId allocated here so we can free it later
-        pDrillClientQueryResult->setQueryId(qid);
-    }else{
-        return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+    {
+    	boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+    	if(msg.m_coord_id==0){
+    		DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0.
Ignore and return QRY_SUCCESS." << std::endl;)
+      		return QRY_SUCCESS;
+    	}
+
+    	for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator
it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){
+    		DrillClientQueryResult* pQueryResult=it->second;
+    		std::string qidString = (pQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pQueryResult->m_pQueryId):std::string("NULL");
+    		DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds:
coordinationId: " << pQueryResult->m_coordinationId
+    				<< " QueryId: "<< qidString << std::endl;)
+    	}
+
+    	std::map<int, DrillClientQueryHandle*>::const_iterator it;
+    	it=this->m_queryHandles.find(msg.m_coord_id);
+    	if(it==this->m_queryHandles.end()){
+    		return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+    	}
+    	pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second);
+    	if (!pDrillClientQueryResult) {
+    		return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
+    	}
+
+    	// Check for cancellation to notify
+    	if (pDrillClientQueryResult->isCancelled()) {
+    		ret = QRY_CANCELED;
+    	}
+    	else {
+    		exec::shared::QueryId *qid = new exec::shared::QueryId;
+    		DRILL_MT_LOG(DRILL_LOG(LOG_TRACE)  << "Received Query Handle " << msg.m_pbody.size()
<< std::endl;)
+    		qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
+    		DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid)
<< std::endl;)
+    		m_queryResults[qid]=pDrillClientQueryResult;
+    		//save queryId allocated here so we can free it later
+    		pDrillClientQueryResult->setQueryId(qid);
+    	}
+    }
+    if (ret == QRY_CANCELED && pDrillClientQueryResult != NULL) {
+    	return handleQryCancellation(ret, pDrillClientQueryResult);
     }
     return ret;
 }
@@ -1486,6 +1500,18 @@ status_t DrillClientImpl::handleQryError(status_t status,
     return status;
 }
 
+status_t DrillClientImpl::handleQryCancellation(status_t status, DrillClientQueryResult*
pQueryHandle) {
+	sendCancel(&pQueryHandle->getQueryId());
+	// Do not decrement pending requests here. We have sent a cancel and we may still receive
results that are
+	// pushed on the wire before the cancel is processed.
+	pQueryHandle->setIsQueryPending(false);
+	DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
+	pQueryHandle->setQueryStatus(status);
+	removeQueryResult(pQueryHandle);
+	removeQueryHandle(pQueryHandle);
+	return status;
+}
+
 void DrillClientImpl::broadcastError(DrillClientError* pErr){
     if(pErr!=NULL){
         std::map<int, DrillClientQueryHandle*>::const_iterator iter;

http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index f9d0779..8da37b6 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -489,6 +489,7 @@ class DrillClientImpl : public DrillClientImplBase{
         status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData&
qd, std::string& valError);
         status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult&
qr, std::string& valError);
         connectionStatus_t handleConnError(connectionStatus_t status, const std::string&
msg);
+        status_t handleQryCancellation(status_t status, DrillClientQueryResult* pQueryResult);
         status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle*
pQueryHandle);
         status_t handleQryError(status_t status, const exec::shared::DrillPBError& e,
DrillClientQueryHandle* pQueryHandle);
         // handle query state indicating query is COMPLETED or CANCELED

http://git-wip-us.apache.org/repos/asf/drill/blob/83513daf/contrib/native/client/src/include/drill/drillClient.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/drillClient.hpp b/contrib/native/client/src/include/drill/drillClient.hpp
index 5e59885..29ae6c2 100644
--- a/contrib/native/client/src/include/drill/drillClient.hpp
+++ b/contrib/native/client/src/include/drill/drillClient.hpp
@@ -1276,6 +1276,13 @@ class DECLSPEC_DRILL_CLIENT DrillClient{
         status_t executeQuery(const PreparedStatement& pstmt, pfnQueryResultsListener
listener, void* listenerCtx, QueryHandle_t* qHandle);
 
         /*
+         * Cancel a query.
+         *
+         * @param[in] the handle of the query to cancel
+         */
+        void cancelQuery(QueryHandle_t handle);
+
+        /*
          * The client application should call this function to wait for results if it has
registered a
          * listener.
          */


Mime
View raw message