drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sudhe...@apache.org
Subject [1/4] drill git commit: DRILL-5218: Support optionally disabling heartbeats from C++ client
Date Mon, 30 Jan 2017 19:42:38 GMT
Repository: drill
Updated Branches:
  refs/heads/master 2af709f43 -> 60624af22


DRILL-5218: Support optionally disabling heartbeats from C++ client

closes #726


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/837722c7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/837722c7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/837722c7

Branch: refs/heads/master
Commit: 837722c7433cf89447b71aa6139f22f03992875c
Parents: 2af709f
Author: Sudheesh Katkam <sudheesh@apache.org>
Authored: Wed Jan 25 14:43:41 2017 -0800
Committer: Sudheesh Katkam <sudheesh@apache.org>
Committed: Mon Jan 30 10:09:38 2017 -0800

----------------------------------------------------------------------
 .../native/client/example/querySubmitter.cpp    |  5 +++
 .../native/client/src/clientlib/drillClient.cpp |  2 +-
 .../client/src/clientlib/drillClientImpl.cpp    | 40 +++++---------------
 .../client/src/clientlib/drillClientImpl.hpp    |  3 +-
 4 files changed, 17 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/837722c7/contrib/native/client/example/querySubmitter.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/example/querySubmitter.cpp b/contrib/native/client/example/querySubmitter.cpp
index 2eeaf35..60f7b8a 100644
--- a/contrib/native/client/example/querySubmitter.cpp
+++ b/contrib/native/client/example/querySubmitter.cpp
@@ -41,6 +41,7 @@ struct Option{
     {"syncSend", "Send query only after previous result is received", false},
     {"hshakeTimeout", "Handshake timeout (second).", false},
     {"queryTimeout", "Query timeout (second).", false},
+    {"heartbeatFrequency", "Heartbeat frequency (second). Disabled if set to 0.", false},
     {"user", "Username", false},
     {"password", "Password", false}
 };
@@ -282,6 +283,7 @@ int main(int argc, char* argv[]) {
         std::string syncSend=qsOptionValues["syncSend"];
         std::string hshakeTimeout=qsOptionValues["hshakeTimeout"];
         std::string queryTimeout=qsOptionValues["queryTimeout"];
+        std::string heartbeatFrequency=qsOptionValues["heartbeatFrequency"];
         std::string user=qsOptionValues["user"];
         std::string password=qsOptionValues["password"];
 
@@ -343,6 +345,9 @@ int main(int argc, char* argv[]) {
         if (!queryTimeout.empty()){
             Drill::DrillClientConfig::setQueryTimeout(atoi(queryTimeout.c_str()));
         }
+        if(!heartbeatFrequency.empty()) {
+            Drill::DrillClientConfig::setHeartbeatFrequency(atoi(heartbeatFrequency.c_str()));
+        }
 
         Drill::DrillUserProperties props;
         if(schema.length()>0){

http://git-wip-us.apache.org/repos/asf/drill/blob/837722c7/contrib/native/client/src/clientlib/drillClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClient.cpp b/contrib/native/client/src/clientlib/drillClient.cpp
index b02f993..fe9c3a6 100644
--- a/contrib/native/client/src/clientlib/drillClient.cpp
+++ b/contrib/native/client/src/clientlib/drillClient.cpp
@@ -105,7 +105,7 @@ void DrillClientConfig::setQueryTimeout(int32_t t){
 }
 
 void DrillClientConfig::setHeartbeatFrequency(int32_t t){
-    if (t>0){
+    if (t>=0){
         boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex);
         s_heartbeatFrequency=t;
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/837722c7/contrib/native/client/src/clientlib/drillClientImpl.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.cpp b/contrib/native/client/src/clientlib/drillClientImpl.cpp
index 51ae1a2..038ca90 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.cpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.cpp
@@ -150,15 +150,18 @@ connectionStatus_t DrillClientImpl::connect(const char* host, const
char* port){
 }
 
 void DrillClientImpl::startHeartbeatTimer(){
-    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
-        << DrillClientConfig::getHeartbeatFrequency() << " seconds." <<
std::endl;)
-    m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
-    m_heartbeatTimer.async_wait(boost::bind(
+    if (DrillClientConfig::getHeartbeatFrequency() > 0) {
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
+                                          << DrillClientConfig::getHeartbeatFrequency()
+                                          << " seconds." << std::endl;)
+        m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
+        m_heartbeatTimer.async_wait(boost::bind(
                 &DrillClientImpl::handleHeartbeatTimeout,
                 this,
                 boost::asio::placeholders::error
-                ));
+        ));
         startMessageListener(); // start this thread early so we don't have the timer blocked
+    }
 }
 
 connectionStatus_t DrillClientImpl::sendHeartbeat(){
@@ -178,12 +181,6 @@ connectionStatus_t DrillClientImpl::sendHeartbeat(){
     return status;
 }
 
-void DrillClientImpl::resetHeartbeatTimer(){
-    m_heartbeatTimer.cancel();
-    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Reset Heartbeat timer." << std::endl;)
-    startHeartbeatTimer();
-}
-
 void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){
     DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired."
<< std::endl;)
     if(err != boost::asio::error::operation_aborted){
@@ -350,7 +347,7 @@ connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties*
prope
     u2b.set_channel(exec::shared::USER);
     u2b.set_rpc_version(DRILL_RPC_VERSION);
     u2b.set_support_listening(true);
-    u2b.set_support_timeout(true);
+    u2b.set_support_timeout(DrillClientConfig::getHeartbeatFrequency() > 0);
 
     // Adding version info
     exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos();
@@ -663,7 +660,7 @@ void DrillClientImpl::getNextResult(){
             ));
     }
 
-    resetHeartbeatTimer();
+    startHeartbeatTimer();
 
     async_read(
             this->m_socket,
@@ -1399,23 +1396,6 @@ void DrillClientImpl::handleRead(ByteBuf_t _buf,
             s = processQueryData(allocatedBuffer, msg);
             break;
 
-        case exec::user::HANDSHAKE:
-            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake
request from server. Send response.\n";)
-            delete allocatedBuffer;
-            // In one case when the client hung, we observed that the server was sending
a handshake request to the client
-            // We should properly handle these handshake requests/responses
-            {
-                boost::lock_guard<boost::mutex> lockDC(this->m_dcMutex);
-                exec::user::UserToBitHandshake u2b;
-                u2b.set_channel(exec::shared::USER);
-                u2b.set_rpc_version(DRILL_RPC_VERSION);
-                u2b.set_support_listening(true);
-                rpc::OutBoundRpcMessage out_msg(exec::rpc::RESPONSE, exec::user::HANDSHAKE,
msg.m_coord_id, &u2b);
-                sendSync(out_msg);
-                DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead:
Handshake response sent.\n";)
-            }
-            break;
-
         default:
             DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE.
"
                     << "QueryResult returned " << msg.m_rpc_type << std::endl;)

http://git-wip-us.apache.org/repos/asf/drill/blob/837722c7/contrib/native/client/src/clientlib/drillClientImpl.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/drillClientImpl.hpp b/contrib/native/client/src/clientlib/drillClientImpl.hpp
index 8da37b6..22e34af 100644
--- a/contrib/native/client/src/clientlib/drillClientImpl.hpp
+++ b/contrib/native/client/src/clientlib/drillClientImpl.hpp
@@ -451,9 +451,8 @@ class DrillClientImpl : public DrillClientImplBase{
         // Direct connection to a drillbit
         // host can be name or ip address, port can be port number or name of service in
/etc/services
         connectionStatus_t connect(const char* host, const char* port);
-        void startHeartbeatTimer();// start a heartbeat timer
+        void startHeartbeatTimer();// start or restart the heartbeat timer
         connectionStatus_t sendHeartbeat(); // send a heartbeat to the server
-        void resetHeartbeatTimer(); // reset the heartbeat timer (called every time one sends
a message to the server (after sendAck, or submitQuery)
         void handleHeartbeatTimeout(const boost::system::error_code & err); // send a
heartbeat. If send fails, broadcast error, close connection and bail out.
 
         int32_t getNextCoordinationId(){ return ++m_coordinationId; };


Mime
View raw message