drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From par...@apache.org
Subject [1/2] drill git commit: DRILL-4647: C++ client fails to propagate a dead connection error to the application. This closes #493
Date Fri, 15 Jul 2016 18:51:59 GMT
Repository: drill
Updated Branches:
  refs/heads/master 9878170d0 -> ba2280612


DRILL-4647: C++ client fails to propagate a dead connection error to the application.
This closes #493


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/6c67f458
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/6c67f458
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/6c67f458

Branch: refs/heads/master
Commit: 6c67f458b9bb1a6a84f901561bb7686315b0207f
Parents: 9878170
Author: Parth Chandra <parthc@apache.org>
Authored: Fri Apr 29 17:38:44 2016 -0700
Committer: Parth Chandra <parthc@apache.org>
Committed: Fri Jul 15 11:51:14 2016 -0700

----------------------------------------------------------------------
 .../native/client/src/clientlib/drillClient.cpp |  4 ++
 .../client/src/clientlib/drillClientImpl.cpp    | 46 ++++++++++++++------
 2 files changed, 36 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/6c67f458/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index 92c5194..1251058 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -369,6 +369,10 @@ status_t DrillClient::submitQuery(Drill::QueryType t, const std::string&
plan, p
 
     ::exec::shared::QueryType castedType = static_cast< ::exec::shared::QueryType>
(t);
     DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(castedType, plan, listener,
listenerCtx);
+    if(pResult==NULL){
+        *qHandle=NULL;
+        return (status_t)this->m_pImpl->getError()->status;
+    }
     *qHandle=(QueryHandle_t)pResult;
     return QRY_SUCCESS;
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/6c67f458/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 3ec01f5..b5d5a31 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -218,6 +218,8 @@ void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code
& e
                 // Close connection.
                 DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat.
Closing connection.";)
                 shutdownSocket();
+                //broadcast to any executing queries
+                handleConnError(CONN_FAILURE, getMessage(ERR_QRY_COMMERR, "Connection to
drillbit lost."));
             }
         }
     }
@@ -469,34 +471,50 @@ DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType
t
 
     uint64_t coordId;
     DrillClientQueryResult* pQuery=NULL;
+    connectionStatus_t cStatus=CONN_SUCCESS;
     {
         boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
         boost::lock_guard<boost::mutex> dcLock(this->m_dcMutex);
         coordId = this->getNextCoordinationId();
         OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::RUN_QUERY, coordId, &query);
-        sendSync(out_msg);
 
+        // Create the result object and register the listener before we send the query
+        // because sometimes the caller is not checking the status of the submitQuery call.
+        // This way, the broadcast error call will cause the results listener to be called
+        // with a COMM_ERROR status.
         pQuery = new DrillClientQueryResult(this, coordId, plan);
         pQuery->registerListener(l, lCtx);
-        bool sendRequest=false;
         this->m_queryIds[coordId]=pQuery;
 
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query request. " << "[" <<
m_connectedHost << "]"  << "Coordination id = " << coordId << std::endl;)
-        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query " <<  "Coordination
id = " << coordId << " query: " << plan << std::endl;)
+        connectionStatus_t cStatus=sendSync(out_msg);
+        if(cStatus == CONN_SUCCESS){
+            bool sendRequest=false;
 
-        if(m_pendingRequests++==0){
-            sendRequest=true;
-        }else{
-            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to server"
<< std::endl;)
-            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " <<
m_pendingRequests << std::endl;)
-        }
-        if(sendRequest){
-            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number of
pending requests = "
-                << m_pendingRequests << std::endl;)
-            getNextResult(); // async wait for results
+            DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query request. " << "["
<< m_connectedHost << "]"  << "Coordination id = " << coordId <<
std::endl;)
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)  << "Sent query " <<  "Coordination
id = " << coordId << " query: " << plan << std::endl;)
+
+                if(m_pendingRequests++==0){
+                    sendRequest=true;
+                }else{
+                    DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queueing query request to
server" << std::endl;)
+                        DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests
= " << m_pendingRequests << std::endl;)
+                }
+            if(sendRequest){
+                DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending query request. Number
of pending requests = "
+                        << m_pendingRequests << std::endl;)
+                    getNextResult(); // async wait for results
+            }
         }
+
+    }
+    if(cStatus!=CONN_SUCCESS){
+        this->m_queryIds.erase(coordId);
+        delete pQuery;
+        return NULL;
     }
 
+
+
     //run this in a new thread
     startMessageListener();
 


Mime
View raw message