pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [pulsar] branch master updated: CPP Client - Error out Lookup request after operation timeout. (#3300)
Date Fri, 04 Jan 2019 20:38:18 GMT
This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 58c8e87  CPP Client - Error out Lookup request after operation timeout. (#3300)
58c8e87 is described below

commit 58c8e87260da30a65b153c8d80b1980ab378f721
Author: Jai Asher <jai1@ccs.neu.edu>
AuthorDate: Fri Jan 4 12:38:13 2019 -0800

    CPP Client - Error out Lookup request after operation timeout. (#3300)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 24 ++++++++++++++++++++----
 pulsar-client-cpp/lib/ClientConnection.h  |  9 ++++++++-
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index d170611..a19bd44 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -701,7 +701,8 @@ void ClientConnection::handleIncomingCommand() {
                     PendingLookupRequestsMap::iterator it =
                         pendingLookupRequests_.find(partitionMetadataResponse.request_id());
                     if (it != pendingLookupRequests_.end()) {
-                        LookupDataResultPromisePtr lookupDataPromise = it->second;
+                        it->second.timer->cancel();
+                        LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
                         pendingLookupRequests_.erase(it);
                         numOfPendingLookupRequest_--;
                         lock.unlock();
@@ -785,7 +786,8 @@ void ClientConnection::handleIncomingCommand() {
                     PendingLookupRequestsMap::iterator it =
                         pendingLookupRequests_.find(lookupTopicResponse.request_id());
                     if (it != pendingLookupRequests_.end()) {
-                        LookupDataResultPromisePtr lookupDataPromise = it->second;
+                        it->second.timer->cancel();
+                        LookupDataResultPromisePtr lookupDataPromise = it->second.promise;
                         pendingLookupRequests_.erase(it);
                         numOfPendingLookupRequest_--;
                         lock.unlock();
@@ -1088,7 +1090,14 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const
uint64_t request
         promise->setFailed(ResultTooManyLookupRequestException);
         return;
     }
-    pendingLookupRequests_.insert(std::make_pair(requestId, promise));
+    LookupRequestData requestData;
+    requestData.timer = executor_->createDeadlineTimer();
+    requestData.timer->expires_from_now(operationsTimeout_);
+    requestData.timer->async_wait(
+        boost::bind(&ClientConnection::handleLookupTimeout, shared_from_this(), _1, requestData));
+    requestData.promise = promise;
+
+    pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
     numOfPendingLookupRequest_++;
     lock.unlock();
     sendCommand(cmd);
@@ -1200,6 +1209,13 @@ void ClientConnection::handleRequestTimeout(const boost::system::error_code&
ec,
     }
 }
 
+void ClientConnection::handleLookupTimeout(const boost::system::error_code& ec,
+                                           LookupRequestData pendingRequestData) {
+    if (!ec) {
+        pendingRequestData.promise->setFailed(ResultTimeout);
+    }
+}
+
 void ClientConnection::handleKeepAliveTimeout() {
     if (isClosed()) {
         return;
@@ -1276,7 +1292,7 @@ void ClientConnection::close() {
 
     for (PendingLookupRequestsMap::iterator it = pendingLookupRequests.begin();
          it != pendingLookupRequests.end(); ++it) {
-        it->second->setFailed(ResultConnectError);
+        it->second.promise->setFailed(ResultConnectError);
     }
 
     for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap.begin();
diff --git a/pulsar-client-cpp/lib/ClientConnection.h b/pulsar-client-cpp/lib/ClientConnection.h
index bdbc8e5..ac8d6c5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -154,6 +154,11 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
         DeadlineTimerPtr timer;
     };
 
+    struct LookupRequestData {
+        LookupDataResultPromisePtr promise;
+        DeadlineTimerPtr timer;
+    };
+
     /*
      * handler for connectAsync
      * creates a ConnectionPtr which has a valid ClientConnection object
@@ -191,6 +196,8 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
 
     void handleRequestTimeout(const boost::system::error_code& ec, PendingRequestData
pendingRequestData);
 
+    void handleLookupTimeout(const boost::system::error_code&, LookupRequestData);
+
     void handleKeepAliveTimeout();
 
     template <typename Handler>
@@ -259,7 +266,7 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
     typedef std::map<long, PendingRequestData> PendingRequestsMap;
     PendingRequestsMap pendingRequests_;
 
-    typedef std::map<long, LookupDataResultPromisePtr> PendingLookupRequestsMap;
+    typedef std::map<long, LookupRequestData> PendingLookupRequestsMap;
     PendingLookupRequestsMap pendingLookupRequests_;
 
     typedef std::map<long, ProducerImplWeakPtr> ProducersMap;


Mime
View raw message