drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject drill git commit: DRILL-1568: C++ Client - Handle Query Cancel
Date Sat, 03 Jan 2015 00:58:18 GMT
Repository: drill
Updated Branches:
  refs/heads/master 8afe10f20 -> c051bbd88


DRILL-1568: C++ Client - Handle Query Cancel


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c051bbd8
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c051bbd8
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c051bbd8

Branch: refs/heads/master
Commit: c051bbd888ad1db5de54283d0e80ac9a08accac5
Parents: 8afe10f
Author: Parth Chandra <pchandra@maprtech.com>
Authored: Mon Nov 10 14:20:57 2014 -0800
Committer: Parth Chandra <pchandra@maprtech.com>
Committed: Fri Jan 2 16:54:17 2015 -0800

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    |  13 +-
 .../client/src/clientlib/drillClientImpl.cpp    | 210 +++++++++++++------
 .../client/src/clientlib/drillClientImpl.hpp    |  23 +-
 contrib/native/client/src/clientlib/errmsgs.cpp |   8 +-
 .../native/client/src/include/drill/common.hpp  |   1 +
 5 files changed, 178 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/c051bbd8/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 7b98bc9..2d89223 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -67,6 +67,7 @@ Drill::status_t QueryResultsListener(void* ctx, Drill::RecordBatch* b, Drill::Dr
     if(!err){
         assert(b!=NULL);
         b->print(std::cout, 0); // print all rows
+        std::cout << "DATA RECEIVED ..." << std::endl;
         delete b; // we're done with this batch, we can delete it
         if(bTestCancel){
             return Drill::QRY_FAILURE;
@@ -304,23 +305,27 @@ int main(int argc, char* argv[]) {
         //DrillClient::initLogging("/var/log/drill/", l);
         // To log to stderr
         Drill::DrillClient::initLogging(NULL, l);
-        Drill::DrillClientConfig::setBufferLimit(2*1024*1024); // 2MB. Allows us to hold
at least two record batches.
+        //Drill::DrillClientConfig::setBufferLimit(2*1024*1024); // 2MB. Allows us to hold
at least two record batches.
+        int nQueries=queryInputs.size();
+        Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query.
Allows us to hold at least two record batches.
 
         if(client.connect(connectStr.c_str(), schema.c_str())!=Drill::CONN_SUCCESS){
             std::cerr<< "Failed to connect with error: "<< client.getError()
<< " (Using:"<<connectStr<<")"<<std::endl;
             return -1;
         }
         std::cout<< "Connected!\n" << std::endl;
-
         if(api=="sync"){
             Drill::DrillClientError* err=NULL;
             Drill::status_t ret;
+            int nQueries=0;
             for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++)
{
                 Drill::RecordIterator* pRecIter = client.submitQuery(type, *queryInpIter,
err);
                 if(pRecIter!=NULL){
                     recordIterators.push_back(pRecIter);
+                    nQueries++;
                 }
             }
+            Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query.
Allows us to hold at least two record batches.
             size_t row=0;
             for(recordIterIter = recordIterators.begin(); recordIterIter != recordIterators.end();
recordIterIter++) {
                 // get fields.
@@ -346,8 +351,8 @@ int main(int argc, char* argv[]) {
                     printf("\n");
                     if(bTestCancel && row%100==1){
                         pRecIter->cancel();
-                        printf("Application canceled the query.\n");
-                }
+                        printf("Application cancelled the query.\n");
+                    }
                 }
                 if(ret!=Drill::QRY_NO_MORE_DATA && ret!=Drill::QRY_CANCEL){
                     std::cerr<< pRecIter->getError() << std::endl;

http://git-wip-us.apache.org/repos/asf/drill/blob/c051bbd8/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index cc70020..23dc407 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -158,7 +158,7 @@ connectionStatus_t DrillClientImpl::sendSync(OutBoundRpcMessage& msg){
     boost::system::error_code ec;
     size_t s=m_socket.write_some(boost::asio::buffer(m_wbuf), ec);
     if(!ec && s!=0){
-    return CONN_SUCCESS;
+        return CONN_SUCCESS;
     }else{
         return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, ec.message().c_str()));
     }
@@ -465,6 +465,7 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
     // Drillbit pushed the query result to the client, the client should send ack
     // whenever it receives the message
     sendAck(msg, true);
+    RecordBatch* pRecordBatch=NULL;
     {
         boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
         exec::shared::QueryResult* qr = new exec::shared::QueryResult; //Record Batch will
own this object and free it up.
@@ -481,62 +482,29 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
         if(it!=this->m_queryResults.end()){
             pDrillClientQueryResult=(*it).second;
         }else{
-            assert(0);
-            //assert might be compiled away in a release build. So return an error to the
app.
-            status_t ret= handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_OUTOFORDER),
NULL);
+            ret=processCancelledQueryResult(qid, qr);
+            DRILL_LOG(LOG_TRACE) << "Cleaning up resource allocated for canceled quyery."
<< std::endl;
             delete qr;
+            delete allocatedBuffer;
             return ret;
         }
         DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
             debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;
 
-        // Drillbit may send query state message which does not contain any
+        // Drillbit may send a query state change message which does not contain any
         // record batch.
-        if (qr->has_query_state()) {
-            ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()];
-            pDrillClientQueryResult->setQueryStatus(ret);
-            switch(qr->query_state()) {
-                case exec::shared::QueryResult_QueryState_FAILED:
-                case exec::shared::QueryResult_QueryState_UNKNOWN_QUERY:
-                    // get the error message from protobuf and handle errors
-                    ret=handleQryError(ret, qr->error(0), pDrillClientQueryResult);
-                    delete allocatedBuffer;
-                    delete qr;
-                    break;
-
-                case exec::shared::QueryResult_QueryState_PENDING:
-                case exec::shared::QueryResult_QueryState_RUNNING:
-                    // Ignore these state messages since they means the query is not completed.
-                    // I have not observed those messages in testing though.
-                    break;
-                    
-                // m_pendingRequests should be decremented when the query is
-                // canceled or completed
-                // in both cases, fall back to free mememory
-                case exec::shared::QueryResult_QueryState_CANCELED:
-                    ret=handleTerminatedQryState(ret,
-                            getMessage(ERR_QRY_CANCELED),
-                            pDrillClientQueryResult);
-                case exec::shared::QueryResult_QueryState_COMPLETED:
-                    ret=handleTerminatedQryState(ret,
-                            getMessage(ERR_QRY_COMPLETED),
-                            pDrillClientQueryResult);
-                    delete allocatedBuffer;
-                    delete qr;
-                    break;
-
-                default:
-                    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Unknown
Query State.\n";
-                    ret=handleQryError(QRY_INTERNAL_ERROR,
-                            getMessage(ERR_QRY_UNKQRYSTATE),
-                            pDrillClientQueryResult);
-                    delete allocatedBuffer;
-                    delete qr;
-                    break;
-            }
+        if (qr->has_query_state() &&
+                qr->query_state() != exec::shared::QueryResult_QueryState_RUNNING &&
+                qr->query_state() != exec::shared::QueryResult_QueryState_PENDING) {
+            ret=processQueryStatusResult(qr, pDrillClientQueryResult);
+            delete allocatedBuffer;
+            delete qr;
             return ret;
         }else{
-            DRILL_LOG(LOG_WARNING) << "DrillClientImpl::processQueryResult: Query State
was not set (assuming a query with no result set.\n";
+            // Normal query results come back with query_state not set.
+            // Actually this is not strictly true. The query state is set to
+            // 0(i.e. PENDING), but protobuf thinks this means the value is not set.
+            DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State
was not set.\n";
         }
 
         //Validate the RPC message
@@ -550,9 +518,9 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
         }
 
         //Build Record Batch here
-        DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;
+        DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qr->query_id())
<< std::endl;
 
-        RecordBatch* pRecordBatch= new RecordBatch(qr, allocatedBuffer,  msg.m_dbody);
+        pRecordBatch= new RecordBatch(qr, allocatedBuffer,  msg.m_dbody);
         pDrillClientQueryResult->m_numBatches++;
 
         DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch
<< std::endl;
@@ -572,6 +540,12 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
         pDrillClientQueryResult->m_bIsQueryPending=true;
         pDrillClientQueryResult->m_bIsLastChunk=qr->is_last_chunk();
         pfnQueryResultsListener pResultsListener=pDrillClientQueryResult->m_pResultsListener;
+        if(pDrillClientQueryResult->m_bIsLastChunk){
+            DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
+                <<  "Received last batch. " << std::endl;
+            ret=QRY_NO_MORE_DATA;
+        }
+        pDrillClientQueryResult->setQueryStatus(ret);
         if(pResultsListener!=NULL){
             ret = pResultsListener(pDrillClientQueryResult, pRecordBatch, NULL);
         }else{
@@ -582,23 +556,50 @@ status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr  allocatedBuffer
     } // release lock
     if(ret==QRY_FAILURE){
         sendCancel(&qid);
-        {
-            boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
-            m_pendingRequests--;
-        }
+        // Do not decrement pending requests here. We have sent a cancel and we may still
receive results that are
+        // pushed on the wire before the cancel is processed.
         pDrillClientQueryResult->m_bIsQueryPending=false;
         DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;
         pDrillClientQueryResult->setQueryStatus(ret);
+        clearMapEntries(pDrillClientQueryResult);
         return ret;
     }
-    if(pDrillClientQueryResult->m_bIsLastChunk){
-        DRILL_LOG(LOG_DEBUG) << debugPrintQid(*pDrillClientQueryResult->m_pQueryId)
-                <<  "Received last batch. " << std::endl;
-        ret=QRY_NO_MORE_DATA;
-        pDrillClientQueryResult->setQueryStatus(ret);
-        return ret;
+    return ret;
+}
+
+status_t DrillClientImpl::processCancelledQueryResult(exec::shared::QueryId& qid, exec::shared::QueryResult*
qr){
+    status_t ret=QRY_SUCCESS;
+    // look in cancelled queries
+    DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(qr->query_id())
<< " has been cancelled." << std::endl;
+    std::set<exec::shared::QueryId*, compareQueryId>::iterator it2;
+    exec::shared::QueryId* pQid=NULL;//
+    it2=this->m_cancelledQueries.find(&qid);
+    if(it2!=this->m_cancelledQueries.end()){
+        pQid=(*it2);
+        if(qr->has_query_state()){
+            ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()];
+            if(qr->query_state()==exec::shared::QueryResult_QueryState_COMPLETED
+                    || qr->query_state()==exec::shared::QueryResult_QueryState_CANCELED
+                    || qr->query_state()==exec::shared::QueryResult_QueryState_FAILED)
{
+                this->m_pendingRequests--;
+                this->m_cancelledQueries.erase(it2);
+                delete pQid;
+                DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(qr->query_id())
<< " completed." << std::endl;
+                DRILL_LOG(LOG_DEBUG) << "Pending requests - " << this->m_pendingRequests
<< std::endl;
+            }
+        }
+    }else{
+        status_t ret=QRY_FAILED;
+        if(qr->has_query_state() && qr->query_state()==exec::shared::QueryResult_QueryState_COMPLETED){
+            ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()];
+        }else if(!qr->has_query_state() && qr->row_count()==0){
+            ret=QRY_SUCCESS;
+        }else{
+            //ret= handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_OUTOFORDER), NULL);
+            DRILL_LOG(LOG_DEBUG) << "Pending requests - " << getMessage(ERR_QRY_OUTOFORDER)
<< std::endl;
+            ret= QRY_SUCCESS;
+        }
     }
-    pDrillClientQueryResult->setQueryStatus(ret);
     return ret;
 }
 
@@ -613,9 +614,9 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer,
InB
     if(it!=this->m_queryIds.end()){
         pDrillClientQueryResult=(*it).second;
         exec::shared::QueryId *qid = new exec::shared::QueryId;
-        DRILL_LOG(LOG_TRACE)  << "Received Query Handle" << msg.m_pbody.size()
<< std::endl;
+        DRILL_LOG(LOG_TRACE)  << "Received Query Handle " << msg.m_pbody.size()
<< std::endl;
         qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
-        DRILL_LOG(LOG_TRACE) << qid->DebugString() << std::endl;
+        DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) <<
std::endl;
         m_queryResults[qid]=pDrillClientQueryResult;
         //save queryId allocated here so we can free it later
         pDrillClientQueryResult->setQueryId(qid);
@@ -627,6 +628,53 @@ status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer,
InB
     return ret;
 }
 
+status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr,
+        DrillClientQueryResult* pDrillClientQueryResult){
+        status_t ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()];
+        pDrillClientQueryResult->setQueryStatus(ret);
+        pDrillClientQueryResult->setQueryState(qr->query_state());
+        switch(qr->query_state()) {
+            case exec::shared::QueryResult_QueryState_FAILED:
+            case exec::shared::QueryResult_QueryState_UNKNOWN_QUERY:
+                {
+                    // get the error message from protobuf and handle errors
+                    ret=handleQryError(ret, qr->error(0), pDrillClientQueryResult);
+                }
+                break;
+
+                // m_pendingRequests should be decremented when the query is
+                // completed
+            case exec::shared::QueryResult_QueryState_CANCELED:
+                {
+                    ret=handleTerminatedQryState(ret,
+                            getMessage(ERR_QRY_CANCELED),
+                            pDrillClientQueryResult);
+                    m_pendingRequests--;
+                }
+                break;
+            case exec::shared::QueryResult_QueryState_COMPLETED:
+                {
+                    // DO NOT call handleTerminateQryState because that
+                    // signals an error condition and the synchronous API
+                    // will then free the query result object without it
+                    // being processed by the application.
+                    ret=QRY_COMPLETED;
+                    m_pendingRequests--;
+                }
+                break;
+
+            default:
+                {
+                    DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Unknown
Query State.\n";
+                    ret=handleQryError(QRY_INTERNAL_ERROR,
+                            getMessage(ERR_QRY_UNKQRYSTATE),
+                            pDrillClientQueryResult);
+                }
+                break;
+        }
+        return ret;
+}
+
 void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
     // if err == boost::asio::error::operation_aborted) then the caller cancelled the timer.
     if(!err){
@@ -690,6 +738,14 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
                 }
                 return;
             }
+        }else if(!error && msg.m_rpc_type==exec::user::ACK){
+            // Cancel requests will result in an ACK sent back.
+            // Consume silently
+            if(m_pendingRequests!=0){
+                boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
+                getNextResult();
+            }
+            return;
         }else{
             boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
             if(error){
@@ -769,6 +825,7 @@ status_t DrillClientImpl::handleQryError(status_t status,
         const exec::shared::DrillPBError& e,
         DrillClientQueryResult* pQueryResult){
     assert(pQueryResult!=NULL);
+    if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
     this->m_pError = DrillClientError::getErrorObject(e);
     pQueryResult->signalError(this->m_pError);
     m_pendingRequests--;
@@ -796,11 +853,23 @@ status_t DrillClientImpl::handleTerminatedQryState(
     DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status,
msg);
     if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
     m_pError=pErr;
-    m_pendingRequests--;
     pQueryResult->signalError(pErr);
     return status;
 }
 
+
+void DrillClientImpl::clearCancelledEntries(){
+
+    std::map<int, DrillClientQueryResult*>::iterator iter;
+    boost::lock_guard<boost::mutex> lock(m_dcMutex);
+
+    if(!m_cancelledQueries.empty()){
+        std::set<exec::shared::QueryId*, compareQueryId>::iterator it;
+        m_cancelledQueries.erase(m_cancelledQueries.begin(), m_cancelledQueries.end());
+    }
+}
+
+
 void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
     std::map<int, DrillClientQueryResult*>::iterator iter;
     boost::lock_guard<boost::mutex> lock(m_dcMutex);
@@ -813,6 +882,13 @@ void DrillClientImpl::clearMapEntries(DrillClientQueryResult* pQueryResult){
         }
     }
     if(!m_queryResults.empty()){
+        // Save the query id and state and free when the query is complete
+        if(pQueryResult->getQueryState()!=exec::shared::QueryResult_QueryState_COMPLETED
+                && pQueryResult->getQueryState()!=exec::shared::QueryResult_QueryState_FAILED){
+            exec::shared::QueryId* pQueryId=new exec::shared::QueryId();
+            pQueryId->CopyFrom(pQueryResult->getQueryId());
+            m_cancelledQueries.insert(pQueryId);
+        }
         std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator
it;
         for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
             if(pQueryResult==(DrillClientQueryResult*)it->second){
@@ -907,6 +983,7 @@ status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
     DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;
     //check if the query has been canceled. IF so then return FAILURE. Caller will send cancel
to the server.
     if(this->m_bCancel){
+        delete b;
         return QRY_FAILURE;
     }
     if (!err) {
@@ -1010,7 +1087,8 @@ void DrillClientQueryResult::clearAndDestroy(){
         }
         m_columnDefs->clear();
     }
-    //Tell the parent to remove this from it's lists
+    DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId)
<< std::endl;
+    //Tell the parent to remove this from its lists
     m_pClient->clearMapEntries(this);
 
     //clear query id map entries.
@@ -1018,7 +1096,7 @@ void DrillClientQueryResult::clearAndDestroy(){
         delete this->m_pQueryId; this->m_pQueryId=NULL;
     }
     if(!m_recordBatches.empty()){
-        // When multiple qwueries execute in parallel we sometimes get an empty record batch
back from the servrer _after_
+        // When multiple qwueries execute in parallel we sometimes get an empty record batch
back from the server _after_
         // the last chunk has been received. We eventually delete it.
         DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;
         RecordBatch* pR=NULL;

http://git-wip-us.apache.org/repos/asf/drill/blob/c051bbd8/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 8e2f437..a5eeb77 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -42,7 +42,6 @@
 #include <zookeeper/zookeeper.h>
 #endif
 
-#include "drill/common.hpp"
 #include "drill/drillClient.hpp"
 #include "rpcEncoder.hpp"
 #include "rpcDecoder.hpp"
@@ -73,6 +72,7 @@ class DrillClientQueryResult{
         m_bHasSchemaChanged(false),
         m_bHasData(false),
         m_bHasError(false),
+        m_queryState(exec::shared::QueryResult_QueryState_PENDING),
         m_pError(NULL),
         m_pQueryId(NULL),
         m_pSchemaListener(NULL),
@@ -126,10 +126,14 @@ class DrillClientQueryResult{
     void setQueryStatus(status_t s){ m_status = s;}
     status_t getQueryStatus(){ return m_status;}
 
+    void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;}
+    exec::shared::QueryResult_QueryState getQueryState(){ return m_queryState;}
+
     private:
     status_t setupColumnDefs(exec::shared::QueryResult* pQueryResult);
     status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err);
     // Construct a DrillClientError object, set the appropriate state and signal any listeners,
condition variables.
+    // Also used when a query is cancelled or when a query completed response is received.
     void signalError(DrillClientError* pErr);
     void clearAndDestroy();
 
@@ -162,6 +166,9 @@ class DrillClientQueryResult{
     bool m_bHasData;
     bool m_bHasError;
 
+    // state in the last query result received from the server.
+    exec::shared::QueryResult_QueryState m_queryState;
+
     const DrillClientError* m_pError;
 
     exec::shared::QueryId* m_pQueryId;
@@ -200,6 +207,7 @@ class DrillClientImpl{
             //Cancel any pending requests
             //Clear and destroy DrillClientQueryResults vector?
 
+            clearCancelledEntries();
             m_deadlineTimer.cancel();
             m_io_service.stop();
             boost::system::error_code ignorederr;
@@ -258,7 +266,10 @@ class DrillClientImpl{
                 InBoundRpcMessage& msg,
                 boost::system::error_code& error);
         status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage&
msg);
+        status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult*
qr);
         status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage&
msg );
+        status_t processQueryStatusResult( exec::shared::QueryResult* qr,
+                DrillClientQueryResult* pDrillClientQueryResult);
         void handleReadTimeout(const boost::system::error_code & err);
         void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t
bytes_transferred) ;
         status_t validateMessage(InBoundRpcMessage& msg, exec::shared::QueryResult&
qr, std::string& valError);
@@ -268,12 +279,13 @@ class DrillClientImpl{
                 const exec::shared::DrillPBError& e,
                 DrillClientQueryResult* pQueryResult);
         // handle query state indicating query is COMPELTED or CANCELED
-        // (i.e., COMPELTED or CANCELE)
+        // (i.e., COMPELTED or CANCELED)
         status_t handleTerminatedQryState(status_t status,
                 std::string msg,
                 DrillClientQueryResult* pQueryResult);
         void broadcastError(DrillClientError* pErr);
         void clearMapEntries(DrillClientQueryResult* pQueryResult);
+        void clearCancelledEntries();
         void sendAck(InBoundRpcMessage& msg, bool isOk);
         void sendCancel(exec::shared::QueryId* pQueryId);
 
@@ -311,8 +323,13 @@ class DrillClientImpl{
         // Map of coordination id to  Query Ids.
         std::map<int, DrillClientQueryResult*> m_queryIds;
 
-        // Map of query id to query result
+        // Map of query id to query result for currently executing queries
         std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults;
+        //
+        // State for every Query id whose queries have result data pending but which
+        // have been cancelled and whose resources have been released by the client application.
+        // The entry is cleared when the state changes to completed or failed.
+        std::set<exec::shared::QueryId*, compareQueryId> m_cancelledQueries;
 
 };
 

http://git-wip-us.apache.org/repos/asf/drill/blob/c051bbd8/contrib/native/client/src/clientlib/errmsgs.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/errmsgs.cpp b/contrib/native/client/src/clientlib/errmsgs.cpp
index 7a7fa6a..e09bda1 100644
--- a/contrib/native/client/src/clientlib/errmsgs.cpp
+++ b/contrib/native/client/src/clientlib/errmsgs.cpp
@@ -50,11 +50,11 @@ static Drill::ErrorMessages errorMessages[]={
     {ERR_QRY_TIMOUT, ERR_CATEGORY_QRY, 0, "Timed out waiting for server to respond."},
     {ERR_QRY_FAILURE, ERR_CATEGORY_QRY, 0, "Query execution error. Details:[ \n%s\n]"},
     {ERR_QRY_SELVEC2, ERR_CATEGORY_QRY, 0, "Receiving a selection_vector_2 from the server
came as a complete surprise at this point"},
-    {ERR_QRY_RESPFAIL, ERR_CATEGORY_QRY, 0, "Got a RESPONSE_FAILURE from the server and don't
know what to do"},
+    {ERR_QRY_RESPFAIL, ERR_CATEGORY_QRY, 0, "Received a RESPONSE_FAILURE from the server."},
     {ERR_QRY_UNKQRYSTATE, ERR_CATEGORY_QRY, 0, "Got an unknown query state message from the
server."},
-    {ERR_QRY_UNKQRY, ERR_CATEGORY_QRY, 0, "The server didn't find this query"},
-    {ERR_QRY_CANCELED, ERR_CATEGORY_QRY, 0, "The server says this query has been cancelled"},
-    {ERR_QRY_COMPLETED, ERR_CATEGORY_QRY, 0, "Received query_state: COMPLETED."},
+    {ERR_QRY_UNKQRY, ERR_CATEGORY_QRY, 0, "Query not found on server. It might have been
terminated already."},
+    {ERR_QRY_CANCELED, ERR_CATEGORY_QRY, 0, "Query has been cancelled"},
+    {ERR_QRY_COMPLETED, ERR_CATEGORY_QRY, 0, "Query completed."},
     {ERR_QRY_16, ERR_CATEGORY_QRY, 0, "Query Failed."},
     {ERR_QRY_17, ERR_CATEGORY_QRY, 0, "Query Failed."},
     {ERR_QRY_18, ERR_CATEGORY_QRY, 0, "Query Failed."},

http://git-wip-us.apache.org/repos/asf/drill/blob/c051bbd8/contrib/native/client/src/include/drill/common.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp
index 59537f1..f83aae4 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -29,6 +29,7 @@
 #endif
 
 #include <stdint.h>
+#include <set>
 #include <string>
 #include <vector>
 #include <boost/shared_ptr.hpp>


Mime
View raw message