pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Replaced boost::bind with std::bind (#3484)
Date Fri, 01 Feb 2019 16:51:56 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dd5d051  Replaced boost::bind with std::bind (#3484)
dd5d051 is described below

commit dd5d05197a9352aba392ae30e7c60e05fc52be0b
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Fri Feb 1 08:51:51 2019 -0800

    Replaced boost::bind with std::bind (#3484)
    
    * Replaced boost::bind with std::bind
    
    * Fixed merging with master
    
    * std::bind doesn't work with overloaded functions
    
    * Go back to asio thread to executor service
    
    * Use proper ref-count increase for WaitForCallback utility
---
 pulsar-client-cpp/examples/SampleAsyncProducer.cc  |  5 +-
 pulsar-client-cpp/examples/SampleProducer.cc       |  2 -
 pulsar-client-cpp/lib/BatchMessageContainer.cc     |  5 +-
 pulsar-client-cpp/lib/BatchMessageContainer.h      |  1 -
 pulsar-client-cpp/lib/BinaryProtoLookupService.cc  |  1 -
 pulsar-client-cpp/lib/ClientConnection.cc          | 94 +++++++++++-----------
 pulsar-client-cpp/lib/ClientImpl.cc                |  8 +-
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 13 ++-
 pulsar-client-cpp/lib/ExecutorService.cc           | 12 +--
 pulsar-client-cpp/lib/ExecutorService.h            |  1 +
 pulsar-client-cpp/lib/HTTPLookupService.h          |  1 -
 pulsar-client-cpp/lib/HandlerBase.cc               |  1 -
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |  4 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  4 +-
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |  1 -
 .../lib/PatternMultiTopicsConsumerImpl.cc          | 11 +--
 pulsar-client-cpp/lib/ProducerImpl.cc              | 17 ++--
 .../lib/UnAckedMessageTrackerEnabled.cc            | 19 +++--
 .../lib/UnAckedMessageTrackerEnabled.h             |  1 -
 .../lib/UnAckedMessageTrackerInterface.h           |  1 -
 pulsar-client-cpp/lib/Utils.h                      | 12 +--
 pulsar-client-cpp/lib/auth/AuthAthenz.cc           |  4 +-
 pulsar-client-cpp/lib/auth/AuthToken.cc            | 10 +--
 pulsar-client-cpp/lib/c/c_Authentication.cc        |  2 +-
 pulsar-client-cpp/lib/c/c_Client.cc                | 30 ++++---
 pulsar-client-cpp/lib/c/c_Consumer.cc              | 23 +++---
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc |  4 +-
 pulsar-client-cpp/lib/c/c_Producer.cc              |  9 +--
 pulsar-client-cpp/lib/c/c_Reader.cc                |  2 +-
 pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc   |  3 +-
 pulsar-client-cpp/lib/c/c_structs.h                |  2 +-
 pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc   |  8 +-
 pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h    |  1 -
 pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc   |  6 +-
 pulsar-client-cpp/lib/stats/ProducerStatsImpl.h    |  1 -
 pulsar-client-cpp/perf/PerfConsumer.cc             |  4 +-
 pulsar-client-cpp/perf/PerfProducer.cc             |  7 +-
 pulsar-client-cpp/python/src/producer.cc           |  5 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 68 ++++++++++------
 pulsar-client-cpp/tests/ConsumerStatsTest.cc       | 19 +++--
 pulsar-client-cpp/tests/ZeroQueueSizeTest.cc       |  5 +-
 41 files changed, 222 insertions(+), 205 deletions(-)

diff --git a/pulsar-client-cpp/examples/SampleAsyncProducer.cc b/pulsar-client-cpp/examples/SampleAsyncProducer.cc
index 3633c07..43d6cc7 100644
--- a/pulsar-client-cpp/examples/SampleAsyncProducer.cc
+++ b/pulsar-client-cpp/examples/SampleAsyncProducer.cc
@@ -17,10 +17,9 @@
  * under the License.
  */
 #include <iostream>
-#include <boost/bind.hpp>
+#include <thread>
 
 #include <pulsar/Client.h>
-#include <pulsar/MessageBuilder.h>
 
 #include <lib/LogUtils.h>
 
@@ -47,7 +46,7 @@ int main() {
         Message msg = MessageBuilder().setContent("content").setProperty("x", "1").build();
         producer.sendAsync(msg, callback);
 
-        sleep(1);
+        std::this_thread::sleep_for(std::chrono::seconds(1));
     }
 
     client.close();
diff --git a/pulsar-client-cpp/examples/SampleProducer.cc b/pulsar-client-cpp/examples/SampleProducer.cc
index 0ee7ae5..bc1ece2 100644
--- a/pulsar-client-cpp/examples/SampleProducer.cc
+++ b/pulsar-client-cpp/examples/SampleProducer.cc
@@ -17,10 +17,8 @@
  * under the License.
  */
 #include <iostream>
-#include <boost/bind.hpp>
 
 #include <pulsar/Client.h>
-#include <pulsar/MessageBuilder.h>
 
 #include <lib/LogUtils.h>
 
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index 3709517..4a512ee 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -18,6 +18,7 @@
  */
 #include "BatchMessageContainer.h"
 #include <memory>
+#include <functional>
 
 namespace pulsar {
 
@@ -80,8 +81,8 @@ void BatchMessageContainer::startTimer() {
     const unsigned long& publishDelayInMs = producer_.conf_.getBatchingMaxPublishDelayMs();
     LOG_DEBUG(*this << " Timer started with expiry after " << publishDelayInMs);
     timer_->expires_from_now(boost::posix_time::milliseconds(publishDelayInMs));
-    timer_->async_wait(boost::bind(&pulsar::ProducerImpl::batchMessageTimeoutHandler, &producer_,
-                                   boost::asio::placeholders::error));
+    timer_->async_wait(
+        std::bind(&pulsar::ProducerImpl::batchMessageTimeoutHandler, &producer_, std::placeholders::_1));
 }
 
 void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.h b/pulsar-client-cpp/lib/BatchMessageContainer.h
index b36da15..4e43e67 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.h
@@ -36,7 +36,6 @@
 #include "Commands.h"
 #include "LogUtils.h"
 #include "ObjectPool.h"
-#include <boost/bind.hpp>
 #include "ExecutorService.h"
 #include <boost/asio.hpp>
 #include "ProducerImpl.h"
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index 3040d69..160efc9 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -21,7 +21,6 @@
 
 #include <lib/TopicName.h>
 
-#include <boost/bind.hpp>
 #include "ConnectionPool.h"
 
 #include <string>
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 2db122b..b8c656d 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -33,8 +33,7 @@
 #include "LogUtils.h"
 #include "Url.h"
 
-#include <boost/bind.hpp>
-
+#include <functional>
 #include <string>
 
 #include "ProducerImpl.h"
@@ -217,8 +216,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
         // Only send keep-alive probes if the broker supports it
         keepAliveTimer_ = executor_->createDeadlineTimer();
         keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-        keepAliveTimer_->async_wait(
-            boost::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+        keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
     }
 
     if (serverProtocolVersion_ >= v8) {
@@ -248,9 +246,9 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
         consumerStatsRequests.push_back(it->first);
     }
     consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
-    consumerStatsRequestTimer_->async_wait(boost::bind(&ClientConnection::handleConsumerStatsTimeout,
-                                                       shared_from_this(), boost::asio::placeholders::error,
-                                                       consumerStatsRequests));
+    consumerStatsRequestTimer_->async_wait(std::bind(&ClientConnection::handleConsumerStatsTimeout,
+                                                     shared_from_this(), std::placeholders::_1,
+                                                     consumerStatsRequests));
     lock.unlock();
     // Complex logic since promises need to be fulfilled outside the lock
     for (int i = 0; i < consumerStatsPromises.size(); i++) {
@@ -320,9 +318,9 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
                     return;
                 }
             }
-            tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
-                                        boost::bind(&ClientConnection::handleHandshake, shared_from_this(),
-                                                    boost::asio::placeholders::error));
+            tlsSocket_->async_handshake(
+                boost::asio::ssl::stream<tcp::socket>::client,
+                std::bind(&ClientConnection::handleHandshake, shared_from_this(), std::placeholders::_1));
         } else {
             handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
         }
@@ -330,9 +328,8 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
         // The connection failed. Try the next endpoint in the list.
         socket_->close();
         tcp::endpoint endpoint = *endpointIterator;
-        socket_->async_connect(endpoint,
-                               boost::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
-                                           boost::asio::placeholders::error, ++endpointIterator));
+        socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
+                                                   std::placeholders::_1, ++endpointIterator));
     } else {
         LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
         close();
@@ -344,9 +341,8 @@ void ClientConnection::handleHandshake(const boost::system::error_code& err) {
     bool connectingThroughProxy = logicalAddress_ != physicalAddress_;
     SharedBuffer buffer = Commands::newConnect(authentication_, logicalAddress_, connectingThroughProxy);
     // Send CONNECT command to broker
-    asyncWrite(buffer.const_asio_buffer(),
-               boost::bind(&ClientConnection::handleSentPulsarConnect, shared_from_this(),
-                           boost::asio::placeholders::error, buffer));
+    asyncWrite(buffer.const_asio_buffer(), std::bind(&ClientConnection::handleSentPulsarConnect,
+                                                     shared_from_this(), std::placeholders::_1, buffer));
 }
 
 void ClientConnection::handleSentPulsarConnect(const boost::system::error_code& err,
@@ -389,9 +385,8 @@ void ClientConnection::tcpConnectAsync() {
 
     LOG_DEBUG(cnxString_ << "Connecting to " << service_url.host() << ":" << service_url.port());
     tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
-    resolver_->async_resolve(
-        query, boost::bind(&ClientConnection::handleResolve, shared_from_this(),
-                           boost::asio::placeholders::error, boost::asio::placeholders::iterator));
+    resolver_->async_resolve(query, std::bind(&ClientConnection::handleResolve, shared_from_this(),
+                                              std::placeholders::_1, std::placeholders::_2));
 }
 
 void ClientConnection::handleResolve(const boost::system::error_code& err,
@@ -406,8 +401,8 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
         LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name()  //
                              << " to " << endpointIterator->endpoint());
         socket_->async_connect(*endpointIterator++,
-                               boost::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
-                                           boost::asio::placeholders::error, endpointIterator));
+                               std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
+                                         std::placeholders::_1, endpointIterator));
     } else {
         LOG_WARN(cnxString_ << "No IP address found");
         close();
@@ -417,9 +412,10 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
 
 void ClientConnection::readNextCommand() {
     const static uint32_t minReadSize = sizeof(uint32_t);
-    asyncReceive(incomingBuffer_.asio_buffer(),
-                 customAllocReadHandler(
-                     boost::bind(&ClientConnection::handleRead, shared_from_this(), _1, _2, minReadSize)));
+    asyncReceive(
+        incomingBuffer_.asio_buffer(),
+        customAllocReadHandler(std::bind(&ClientConnection::handleRead, shared_from_this(),
+                                         std::placeholders::_1, std::placeholders::_2, minReadSize)));
 }
 
 void ClientConnection::handleRead(const boost::system::error_code& err, size_t bytesTransferred,
@@ -434,8 +430,9 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b
         // region
         SharedBuffer buffer = incomingBuffer_.slice(bytesTransferred);
         asyncReceive(buffer.asio_buffer(),
-                     customAllocReadHandler(boost::bind(&ClientConnection::handleRead, shared_from_this(), _1,
-                                                        _2, minReadSize - bytesTransferred)));
+                     customAllocReadHandler(std::bind(&ClientConnection::handleRead, shared_from_this(),
+                                                      std::placeholders::_1, std::placeholders::_2,
+                                                      minReadSize - bytesTransferred)));
     } else {
         processIncomingBuffer();
     }
@@ -459,8 +456,9 @@ void ClientConnection::processIncomingBuffer() {
             if (bytesToReceive <= incomingBuffer_.writableBytes()) {
                 // The rest of the frame still fits in the current buffer
                 asyncReceive(incomingBuffer_.asio_buffer(),
-                             customAllocReadHandler(boost::bind(&ClientConnection::handleRead,
-                                                                shared_from_this(), _1, _2, bytesToReceive)));
+                             customAllocReadHandler(std::bind(&ClientConnection::handleRead,
+                                                              shared_from_this(), std::placeholders::_1,
+                                                              std::placeholders::_2, bytesToReceive)));
                 return;
             } else {
                 // Need to allocate a buffer big enough for the frame
@@ -468,8 +466,9 @@ void ClientConnection::processIncomingBuffer() {
                 incomingBuffer_ = SharedBuffer::copyFrom(incomingBuffer_, newBufferSize);
 
                 asyncReceive(incomingBuffer_.asio_buffer(),
-                             customAllocReadHandler(boost::bind(&ClientConnection::handleRead,
-                                                                shared_from_this(), _1, _2, bytesToReceive)));
+                             customAllocReadHandler(std::bind(&ClientConnection::handleRead,
+                                                              shared_from_this(), std::placeholders::_1,
+                                                              std::placeholders::_2, bytesToReceive)));
                 return;
             }
         }
@@ -524,9 +523,10 @@ void ClientConnection::processIncomingBuffer() {
         // At least we need to read 4 bytes to have the complete frame size
         uint32_t minReadSize = sizeof(uint32_t) - incomingBuffer_.readableBytes();
 
-        asyncReceive(incomingBuffer_.asio_buffer(),
-                     customAllocReadHandler(boost::bind(&ClientConnection::handleRead, shared_from_this(), _1,
-                                                        _2, minReadSize)));
+        asyncReceive(
+            incomingBuffer_.asio_buffer(),
+            customAllocReadHandler(std::bind(&ClientConnection::handleRead, shared_from_this(),
+                                             std::placeholders::_1, std::placeholders::_2, minReadSize)));
         return;
     }
 
@@ -1099,8 +1099,8 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request
     LookupRequestData requestData;
     requestData.timer = executor_->createDeadlineTimer();
     requestData.timer->expires_from_now(operationsTimeout_);
-    requestData.timer->async_wait(
-        boost::bind(&ClientConnection::handleLookupTimeout, shared_from_this(), _1, requestData));
+    requestData.timer->async_wait(std::bind(&ClientConnection::handleLookupTimeout, shared_from_this(),
+                                            std::placeholders::_1, requestData));
     requestData.promise = promise;
 
     pendingLookupRequests_.insert(std::make_pair(requestId, requestData));
@@ -1114,8 +1114,9 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) {
 
     if (pendingWriteOperations_++ == 0) {
         // Write immediately to socket
-        asyncWrite(cmd.const_asio_buffer(), customAllocWriteHandler(boost::bind(
-                                                &ClientConnection::handleSend, shared_from_this(), _1, cmd)));
+        asyncWrite(cmd.const_asio_buffer(),
+                   customAllocWriteHandler(std::bind(&ClientConnection::handleSend, shared_from_this(),
+                                                     std::placeholders::_1, cmd)));
     } else {
         // Queue to send later
         pendingWriteBuffers_.push_back(cmd);
@@ -1130,8 +1131,8 @@ void ClientConnection::sendMessage(const OpSendMsg& opSend) {
                                                     opSend.sequenceId_, getChecksumType(), opSend.msg_);
 
         // Write immediately to socket
-        asyncWrite(buffer, customAllocWriteHandler(
-                               boost::bind(&ClientConnection::handleSendPair, shared_from_this(), _1)));
+        asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
+                                                             shared_from_this(), std::placeholders::_1)));
     } else {
         // Queue to send later
         pendingWriteBuffers_.push_back(opSend);
@@ -1167,8 +1168,8 @@ void ClientConnection::sendPendingCommands() {
         if (any.type() == typeid(SharedBuffer)) {
             SharedBuffer buffer = boost::any_cast<SharedBuffer>(any);
             asyncWrite(buffer.const_asio_buffer(),
-                       customAllocWriteHandler(
-                           boost::bind(&ClientConnection::handleSend, shared_from_this(), _1, buffer)));
+                       customAllocWriteHandler(std::bind(&ClientConnection::handleSend, shared_from_this(),
+                                                         std::placeholders::_1, buffer)));
         } else {
             assert(any.type() == typeid(OpSendMsg));
 
@@ -1176,8 +1177,8 @@ void ClientConnection::sendPendingCommands() {
             PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_, op.producerId_,
                                                         op.sequenceId_, getChecksumType(), op.msg_);
 
-            asyncWrite(buffer, customAllocWriteHandler(
-                                   boost::bind(&ClientConnection::handleSendPair, shared_from_this(), _1)));
+            asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
+                                                                 shared_from_this(), std::placeholders::_1)));
         }
     } else {
         // No more pending writes
@@ -1198,8 +1199,8 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm
     PendingRequestData requestData;
     requestData.timer = executor_->createDeadlineTimer();
     requestData.timer->expires_from_now(operationsTimeout_);
-    requestData.timer->async_wait(
-        boost::bind(&ClientConnection::handleRequestTimeout, shared_from_this(), _1, requestData));
+    requestData.timer->async_wait(std::bind(&ClientConnection::handleRequestTimeout, shared_from_this(),
+                                            std::placeholders::_1, requestData));
 
     pendingRequests_.insert(std::make_pair(requestId, requestData));
     lock.unlock();
@@ -1237,8 +1238,7 @@ void ClientConnection::handleKeepAliveTimeout() {
         sendCommand(Commands::newPing());
 
         keepAliveTimer_->expires_from_now(boost::posix_time::seconds(KeepAliveIntervalInSeconds));
-        keepAliveTimer_->async_wait(
-            boost::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
+        keepAliveTimer_->async_wait(std::bind(&ClientConnection::handleKeepAliveTimeout, shared_from_this()));
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index f3b193d..e489a24 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -27,7 +27,6 @@
 #include "MultiTopicsConsumerImpl.h"
 #include "PatternMultiTopicsConsumerImpl.h"
 #include "SimpleLoggerImpl.h"
-#include <boost/bind.hpp>
 #include <boost/algorithm/string/predicate.hpp>
 #include <sstream>
 #include <openssl/sha.h>
@@ -115,12 +114,11 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
     if (serviceUrl_.compare(0, 4, "http") == 0) {
         LOG_DEBUG("Using HTTP Lookup");
         lookupServicePtr_ =
-            std::make_shared<HTTPLookupService>(boost::cref(serviceUrl_), boost::cref(clientConfiguration_),
-                                                boost::cref(clientConfiguration_.getAuthPtr()));
+            std::make_shared<HTTPLookupService>(std::cref(serviceUrl_), std::cref(clientConfiguration_),
+                                                std::cref(clientConfiguration_.getAuthPtr()));
     } else {
         LOG_DEBUG("Using Binary Lookup");
-        lookupServicePtr_ =
-            std::make_shared<BinaryProtoLookupService>(boost::ref(pool_), boost::ref(serviceUrl));
+        lookupServicePtr_ = std::make_shared<BinaryProtoLookupService>(std::ref(pool_), std::ref(serviceUrl));
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index a2cabf8..e254480 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -20,7 +20,6 @@
 #include "MessageImpl.h"
 #include "Commands.h"
 #include "LogUtils.h"
-#include <boost/bind.hpp>
 #include <lib/TopicName.h>
 #include "pulsar/Result.h"
 #include "pulsar/MessageId.h"
@@ -304,8 +303,8 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
         lock.unlock();
 
         if (asyncReceivedWaiting) {
-            listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                                    shared_from_this(), ResultOk, m, callback));
+            listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                                  shared_from_this(), ResultOk, m, callback));
             return;
         }
 
@@ -341,8 +340,8 @@ void ConsumerImpl::failPendingReceiveCallback() {
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
-        listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                                shared_from_this(), ResultAlreadyClosed, msg, callback));
+        listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                              shared_from_this(), ResultAlreadyClosed, msg, callback));
     }
     lock.unlock();
 }
@@ -391,8 +390,8 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
             ReceiveCallback callback = pendingReceives_.front();
             pendingReceives_.pop();
             lock.unlock();
-            listenerExecutor_->postWork(boost::bind(&ConsumerImpl::notifyPendingReceivedCallback,
-                                                    shared_from_this(), ResultOk, msg, callback));
+            listenerExecutor_->postWork(std::bind(&ConsumerImpl::notifyPendingReceivedCallback,
+                                                  shared_from_this(), ResultOk, msg, callback));
         } else {
             // Regular path, append individual message to incoming messages queue
             incomingMessages_.push(msg);
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index 9800fd3..194820a 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -18,18 +18,14 @@
  */
 #include "ExecutorService.h"
 
-#include <boost/ref.hpp>
 #include <boost/asio.hpp>
-#include <boost/bind.hpp>
 #include <functional>
 #include <memory>
 
 namespace pulsar {
 
 ExecutorService::ExecutorService()
-    : io_service_(),
-      work_(new BackgroundWork(io_service_)),
-      worker_(boost::bind(&boost::asio::io_service::run, &io_service_)) {}
+    : io_service_(), work_(new BackgroundWork(io_service_)), worker_([&]() { io_service_.run(); }) {}
 
 ExecutorService::~ExecutorService() { close(); }
 
@@ -38,7 +34,7 @@ ExecutorService::~ExecutorService() { close(); }
  *  @ returns shared_ptr to this socket
  */
 SocketPtr ExecutorService::createSocket() {
-    return std::make_shared<boost::asio::ip::tcp::socket>(boost::ref(io_service_));
+    return std::make_shared<boost::asio::ip::tcp::socket>(std::ref(io_service_));
 }
 
 TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx) {
@@ -51,11 +47,11 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ss
  *  @returns shraed_ptr to resolver object
  */
 TcpResolverPtr ExecutorService::createTcpResolver() {
-    return std::make_shared<boost::asio::ip::tcp::resolver>(boost::ref(io_service_));
+    return std::make_shared<boost::asio::ip::tcp::resolver>(std::ref(io_service_));
 }
 
 DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
-    return std::make_shared<boost::asio::deadline_timer>(boost::ref(io_service_));
+    return std::make_shared<boost::asio::deadline_timer>(std::ref(io_service_));
 }
 
 void ExecutorService::close() {
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index 545abe5..81144fc 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -23,6 +23,7 @@
 #include <boost/asio.hpp>
 #include <boost/asio/ssl.hpp>
 #include <functional>
+#include <thread>
 #include <boost/noncopyable.hpp>
 #include <mutex>
 
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h b/pulsar-client-cpp/lib/HTTPLookupService.h
index c8a5f5e..c6998be 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.h
+++ b/pulsar-client-cpp/lib/HTTPLookupService.h
@@ -24,7 +24,6 @@
 #include <lib/Url.h>
 #include <json/value.h>
 #include <json/reader.h>
-#include <boost/bind.hpp>
 #include <curl/curl.h>
 #include <lib/Version.h>
 
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc
index e3277bd..5b5b025 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -17,7 +17,6 @@
  * under the License.
  */
 #include "HandlerBase.h"
-#include <boost/bind.hpp>
 
 #include <cassert>
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 90aa9bf..98385d5 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -442,7 +442,7 @@ void MultiTopicsConsumerImpl::messageReceived(Consumer consumer, const Message&
         pendingReceives_.pop();
         lock.unlock();
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
-        listenerExecutor_->postWork(boost::bind(callback, ResultOk, msg));
+        listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
     } else {
         if (messages_.full()) {
             lock.unlock();
@@ -534,7 +534,7 @@ void MultiTopicsConsumerImpl::failPendingReceiveCallback() {
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
-        listenerExecutor_->postWork(boost::bind(callback, ResultAlreadyClosed, msg));
+        listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, msg));
     }
     lock.unlock();
 }
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 6edf06f..9ce3703 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -352,7 +352,7 @@ void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message&
         pendingReceives_.pop();
         lock.unlock();
         unAckedMessageTrackerPtr_->add(msg.getMessageId());
-        listenerExecutor_->postWork(boost::bind(callback, ResultOk, msg));
+        listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
     } else {
         if (messages_.full()) {
             lock.unlock();
@@ -371,7 +371,7 @@ void PartitionedConsumerImpl::failPendingReceiveCallback() {
     while (!pendingReceives_.empty()) {
         ReceiveCallback callback = pendingReceives_.front();
         pendingReceives_.pop();
-        listenerExecutor_->postWork(boost::bind(callback, ResultAlreadyClosed, msg));
+        listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, msg));
     }
     lock.unlock();
 }
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index b9cee8f..de33caa 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -19,7 +19,6 @@
 #include <cstdlib>
 #include "PartitionedProducerImpl.h"
 #include "LogUtils.h"
-#include <boost/bind.hpp>
 #include <lib/TopicName.h>
 #include <sstream>
 #include "RoundRobinMessageRouter.h"
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
index 27cb2b4..9abe612 100644
--- a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -41,7 +41,7 @@ void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
     autoDiscoveryRunning_ = false;
     autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
     autoDiscoveryTimer_->async_wait(
-        boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, _1));
+        std::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, std::placeholders::_1));
 }
 
 void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system::error_code& err) {
@@ -70,7 +70,8 @@ void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const boost::system:
     assert(namespaceName_);
 
     lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_)
-        .addListener(boost::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this, _1, _2));
+        .addListener(std::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace, this,
+                               std::placeholders::_1, std::placeholders::_2));
 }
 
 void PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace(const Result result,
@@ -128,8 +129,8 @@ void PatternMultiTopicsConsumerImpl::onTopicsAdded(NamespaceTopicsPtr addedTopic
     for (std::vector<std::string>::const_iterator itr = addedTopics->begin(); itr != addedTopics->end();
          itr++) {
         MultiTopicsConsumerImpl::subscribeOneTopicAsync(*itr).addListener(
-            boost::bind(&PatternMultiTopicsConsumerImpl::handleOneTopicAdded, this, _1, *itr,
-                        topicsNeedCreate, callback));
+            std::bind(&PatternMultiTopicsConsumerImpl::handleOneTopicAdded, this, std::placeholders::_1, *itr,
+                      topicsNeedCreate, callback));
     }
 }
 
@@ -218,7 +219,7 @@ void PatternMultiTopicsConsumerImpl::start() {
         autoDiscoveryTimer_ = client_->getIOExecutorProvider()->get()->createDeadlineTimer();
         autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
         autoDiscoveryTimer_->async_wait(
-            boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, _1));
+            std::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, std::placeholders::_1));
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index 730228e..570fd30 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -22,7 +22,6 @@
 #include "PulsarApi.pb.h"
 #include "Commands.h"
 #include "BatchMessageContainer.h"
-#include <boost/bind.hpp>
 #include <boost/date_time/local_time/local_time.hpp>
 #include <lib/TopicName.h>
 
@@ -60,9 +59,9 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const
     lastSequenceIdPublished_ = initialSequenceId;
     msgSequenceGenerator_ = initialSequenceId + 1;
 
-    // boost::ref is used to drop the constantness constraint of make_shared
+    // std::ref is used to drop the constantness constraint of make_shared
     if (conf_.getBatchingEnabled()) {
-        batchMessageContainer = std::make_shared<BatchMessageContainer>(boost::ref(*this));
+        batchMessageContainer = std::make_shared<BatchMessageContainer>(std::ref(*this));
     }
 
     unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
@@ -83,7 +82,7 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const
         dataKeyGenTImer_ = executor_->createDeadlineTimer();
         dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
         dataKeyGenTImer_->async_wait(
-            boost::bind(&pulsar::ProducerImpl::refreshEncryptionKey, this, boost::asio::placeholders::error));
+            std::bind(&pulsar::ProducerImpl::refreshEncryptionKey, this, std::placeholders::_1));
     }
 }
 
@@ -113,8 +112,8 @@ void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
     msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader());
 
     dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
-    dataKeyGenTImer_->async_wait(boost::bind(&pulsar::ProducerImpl::refreshEncryptionKey, shared_from_this(),
-                                             boost::asio::placeholders::error));
+    dataKeyGenTImer_->async_wait(
+        std::bind(&pulsar::ProducerImpl::refreshEncryptionKey, shared_from_this(), std::placeholders::_1));
 }
 
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
@@ -180,7 +179,8 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         if (!sendTimer_ && conf_.getSendTimeout() > 0) {
             sendTimer_ = executor_->createDeadlineTimer();
             sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
-            sendTimer_->async_wait(boost::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), _1));
+            sendTimer_->async_wait(
+                std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
         }
 
         producerCreatedPromise_.setValue(shared_from_this());
@@ -559,7 +559,8 @@ void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
     }
 
     // Asynchronously wait for the timeout to trigger
-    sendTimer_->async_wait(boost::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), _1));
+    sendTimer_->async_wait(
+        std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
 }
 
 bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 248a60d..2c768b2 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -18,25 +18,24 @@
  */
 #include "UnAckedMessageTrackerEnabled.h"
 
+#include <functional>
+
 DECLARE_LOG_OBJECT();
 
 namespace pulsar {
 
-void UnAckedMessageTrackerEnabled::timeoutHandler(const boost::system::error_code& ec) {
-    if (ec) {
-        LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
-    } else {
-        timeoutHandler();
-    }
-}
-
 void UnAckedMessageTrackerEnabled::timeoutHandler() {
     timeoutHandlerHelper();
     ExecutorServicePtr executorService = client_->getIOExecutorProvider()->get();
     timer_ = executorService->createDeadlineTimer();
     timer_->expires_from_now(boost::posix_time::milliseconds(timeoutMs_));
-    timer_->async_wait(boost::bind(&pulsar::UnAckedMessageTrackerEnabled::timeoutHandler, this,
-                                   boost::asio::placeholders::error));
+    timer_->async_wait([&](const boost::system::error_code& ec) {
+        if (ec) {
+            LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
+        } else {
+            timeoutHandler();
+        }
+    });
 }
 
 void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
index ea92305..921e747 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.h
@@ -37,7 +37,6 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
     void clear();
 
    private:
-    void timeoutHandler(const boost::system::error_code& ec);
     void timeoutHandlerHelper();
     bool isEmpty();
     long size();
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
index ea49912..a4e83e9 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerInterface.h
@@ -26,7 +26,6 @@
 #include "pulsar/MessageId.h"
 #include "lib/ClientImpl.h"
 #include "lib/ConsumerImplBase.h"
-#include <boost/bind.hpp>
 #include <boost/asio.hpp>
 #include <lib/LogUtils.h>
 #include "lib/PulsarApi.pb.h"
diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h
index 620cf60..cd2fb6d 100644
--- a/pulsar-client-cpp/lib/Utils.h
+++ b/pulsar-client-cpp/lib/Utils.h
@@ -29,18 +29,18 @@
 namespace pulsar {
 
 struct WaitForCallback {
-    Promise<bool, Result>& m_promise;
+    Promise<bool, Result> m_promise;
 
-    WaitForCallback(Promise<bool, Result>& promise) : m_promise(promise) {}
+    WaitForCallback(Promise<bool, Result> promise) : m_promise(promise) {}
 
     void operator()(Result result) { m_promise.setValue(result); }
 };
 
 template <typename T>
 struct WaitForCallbackValue {
-    Promise<Result, T>& m_promise;
+    Promise<Result, T> m_promise;
 
-    WaitForCallbackValue(Promise<Result, T>& promise) : m_promise(promise) {}
+    WaitForCallbackValue(Promise<Result, T> promise) : m_promise(promise) {}
 
     void operator()(Result result, const T& value) {
         if (result == ResultOk) {
@@ -53,9 +53,9 @@ struct WaitForCallbackValue {
 
 template <typename T>
 struct WaitForCallbackType {
-    Promise<Result, T>& m_promise;
+    Promise<Result, T> m_promise;
 
-    WaitForCallbackType(Promise<Result, T>& promise) : m_promise(promise) {}
+    WaitForCallbackType(Promise<Result, T> promise) : m_promise(promise) {}
 
     void operator()(T result) { m_promise.setValue(result); }
 };
diff --git a/pulsar-client-cpp/lib/auth/AuthAthenz.cc b/pulsar-client-cpp/lib/auth/AuthAthenz.cc
index 50ec910..0920d10 100644
--- a/pulsar-client-cpp/lib/auth/AuthAthenz.cc
+++ b/pulsar-client-cpp/lib/auth/AuthAthenz.cc
@@ -32,13 +32,13 @@
 #include <json/value.h>
 #include <json/reader.h>
 
-#include <boost/ref.hpp>
+#include <functional>
 
 DECLARE_LOG_OBJECT()
 
 namespace pulsar {
 AuthDataAthenz::AuthDataAthenz(ParamMap& params) {
-    ztsClient_ = std::make_shared<ZTSClient>(boost::ref(params));
+    ztsClient_ = std::make_shared<ZTSClient>(std::ref(params));
     LOG_DEBUG("AuthDataAthenz is construted.")
 }
 
diff --git a/pulsar-client-cpp/lib/auth/AuthToken.cc b/pulsar-client-cpp/lib/auth/AuthToken.cc
index bebfe4e..e377139 100644
--- a/pulsar-client-cpp/lib/auth/AuthToken.cc
+++ b/pulsar-client-cpp/lib/auth/AuthToken.cc
@@ -19,7 +19,7 @@
 #include "AuthToken.h"
 
 #include <boost/algorithm/string/predicate.hpp>
-#include <boost/bind.hpp>
+#include <functional>
 
 #include <sstream>
 #include <fstream>
@@ -65,14 +65,14 @@ AuthToken::~AuthToken() {}
 
 AuthenticationPtr AuthToken::create(ParamMap &params) {
     if (params.find("token") != params.end()) {
-        return create(boost::bind(&readDirect, params["token"]));
+        return create(std::bind(&readDirect, params["token"]));
     } else if (params.find("file") != params.end()) {
         // Read token from a file
-        return create(boost::bind(&readFromFile, params["file"]));
+        return create(std::bind(&readFromFile, params["file"]));
     } else if (params.find("env") != params.end()) {
         // Read token from environment variable
         std::string envVarName = params["env"];
-        return create(boost::bind(&readFromEnv, envVarName));
+        return create(std::bind(&readFromEnv, envVarName));
     } else {
         throw "Invalid configuration for token provider";
     }
@@ -99,7 +99,7 @@ AuthenticationPtr AuthToken::create(const std::string &authParamsString) {
 }
 
 AuthenticationPtr AuthToken::createWithToken(const std::string &token) {
-    return create(boost::bind(&readDirect, token));
+    return create(std::bind(&readDirect, token));
 }
 
 AuthenticationPtr AuthToken::create(const TokenSupplier &tokenSupplier) {
diff --git a/pulsar-client-cpp/lib/c/c_Authentication.cc b/pulsar-client-cpp/lib/c/c_Authentication.cc
index 83d344b..0485a57 100644
--- a/pulsar-client-cpp/lib/c/c_Authentication.cc
+++ b/pulsar-client-cpp/lib/c/c_Authentication.cc
@@ -63,6 +63,6 @@ static std::string tokenSupplierWrapper(token_supplier supplier, void *ctx) {
 pulsar_authentication_t *pulsar_authentication_token_create_with_supplier(token_supplier tokenSupplier,
                                                                           void *ctx) {
     pulsar_authentication_t *authentication = new pulsar_authentication_t;
-    authentication->auth = pulsar::AuthToken::create(boost::bind(&tokenSupplierWrapper, tokenSupplier, ctx));
+    authentication->auth = pulsar::AuthToken::create(std::bind(&tokenSupplierWrapper, tokenSupplier, ctx));
     return authentication;
 }
\ No newline at end of file
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc
index f890305..74da773 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -19,8 +19,6 @@
 
 #include <pulsar/c/client.h>
 
-#include <boost/bind.hpp>
-
 #include "c_structs.h"
 
 pulsar_client_t *pulsar_client_create(const char *serviceUrl,
@@ -61,7 +59,8 @@ void pulsar_client_create_producer_async(pulsar_client_t *client, const char *to
                                          const pulsar_producer_configuration_t *conf,
                                          pulsar_create_producer_callback callback, void *ctx) {
     client->client->createProducerAsync(topic, conf->conf,
-                                        boost::bind(&handle_create_producer_callback, _1, _2, callback, ctx));
+                                        std::bind(&handle_create_producer_callback, std::placeholders::_1,
+                                                  std::placeholders::_2, callback, ctx));
 }
 
 pulsar_result pulsar_client_subscribe(pulsar_client_t *client, const char *topic,
@@ -94,8 +93,9 @@ static void handle_subscribe_callback(pulsar::Result result, pulsar::Consumer co
 void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, const char *subscriptionName,
                                    const pulsar_consumer_configuration_t *conf,
                                    pulsar_subscribe_callback callback, void *ctx) {
-    client->client->subscribeAsync(topic, subscriptionName, conf->consumerConfiguration,
-                                   boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+    client->client->subscribeAsync(
+        topic, subscriptionName, conf->consumerConfiguration,
+        std::bind(&handle_subscribe_callback, std::placeholders::_1, std::placeholders::_2, callback, ctx));
 }
 
 void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount,
@@ -107,16 +107,18 @@ void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const c
         topicsList.push_back(topics[i]);
     }
 
-    client->client->subscribeAsync(topicsList, subscriptionName, conf->consumerConfiguration,
-                                   boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+    client->client->subscribeAsync(
+        topicsList, subscriptionName, conf->consumerConfiguration,
+        std::bind(&handle_subscribe_callback, std::placeholders::_1, std::placeholders::_2, callback, ctx));
 }
 
 void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
                                            const char *subscriptionName,
                                            const pulsar_consumer_configuration_t *conf,
                                            pulsar_subscribe_callback callback, void *ctx) {
-    client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, conf->consumerConfiguration,
-                                            boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+    client->client->subscribeWithRegexAsync(
+        topicPattern, subscriptionName, conf->consumerConfiguration,
+        std::bind(&handle_subscribe_callback, std::placeholders::_1, std::placeholders::_2, callback, ctx));
 }
 
 pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
@@ -148,8 +150,9 @@ void pulsar_client_create_reader_async(pulsar_client_t *client, const char *topi
                                        const pulsar_message_id_t *startMessageId,
                                        pulsar_reader_configuration_t *conf, pulsar_reader_callback callback,
                                        void *ctx) {
-    client->client->createReaderAsync(topic, startMessageId->messageId, conf->conf,
-                                      boost::bind(&handle_reader_callback, _1, _2, callback, ctx));
+    client->client->createReaderAsync(
+        topic, startMessageId->messageId, conf->conf,
+        std::bind(&handle_reader_callback, std::placeholders::_1, std::placeholders::_2, callback, ctx));
 }
 
 pulsar_result pulsar_client_get_topic_partitions(pulsar_client_t *client, const char *topic,
@@ -188,7 +191,8 @@ static void handle_get_partitions_callback(pulsar::Result result,
 void pulsar_client_get_topic_partitions_async(pulsar_client_t *client, const char *topic,
                                               pulsar_get_partitions_callback callback, void *ctx) {
     client->client->getPartitionsForTopicAsync(
-        topic, boost::bind(&handle_get_partitions_callback, _1, _2, callback, ctx));
+        topic, std::bind(&handle_get_partitions_callback, std::placeholders::_1, std::placeholders::_2,
+                         callback, ctx));
 }
 
 pulsar_result pulsar_client_close(pulsar_client_t *client) { return (pulsar_result)client->client->close(); }
@@ -198,5 +202,5 @@ static void handle_client_close(pulsar::Result result, pulsar_close_callback cal
 }
 
 void pulsar_client_close_async(pulsar_client_t *client, pulsar_close_callback callback, void *ctx) {
-    client->client->closeAsync(boost::bind(handle_client_close, _1, callback, ctx));
+    client->client->closeAsync(std::bind(handle_client_close, std::placeholders::_1, callback, ctx));
 }
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc b/pulsar-client-cpp/lib/c/c_Consumer.cc
index f9a211d..fed1683 100644
--- a/pulsar-client-cpp/lib/c/c_Consumer.cc
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -35,7 +35,8 @@ pulsar_result pulsar_consumer_unsubscribe(pulsar_consumer_t *consumer) {
 
 void pulsar_consumer_unsubscribe_async(pulsar_consumer_t *consumer, pulsar_result_callback callback,
                                        void *ctx) {
-    consumer->consumer.unsubscribeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
+    consumer->consumer.unsubscribeAsync(
+        std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_receive(pulsar_consumer_t *consumer, pulsar_message_t **msg) {
@@ -69,14 +70,14 @@ pulsar_result pulsar_consumer_acknowledge_id(pulsar_consumer_t *consumer, pulsar
 
 void pulsar_consumer_acknowledge_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
                                        pulsar_result_callback callback, void *ctx) {
-    consumer->consumer.acknowledgeAsync(message->message,
-                                        boost::bind(handle_result_callback, _1, callback, ctx));
+    consumer->consumer.acknowledgeAsync(
+        message->message, std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
 
 void pulsar_consumer_acknowledge_async_id(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
                                           pulsar_result_callback callback, void *ctx) {
-    consumer->consumer.acknowledgeAsync(messageId->messageId,
-                                        boost::bind(handle_result_callback, _1, callback, ctx));
+    consumer->consumer.acknowledgeAsync(
+        messageId->messageId, std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_acknowledge_cumulative(pulsar_consumer_t *consumer, pulsar_message_t *message) {
@@ -90,15 +91,15 @@ pulsar_result pulsar_consumer_acknowledge_cumulative_id(pulsar_consumer_t *consu
 
 void pulsar_consumer_acknowledge_cumulative_async(pulsar_consumer_t *consumer, pulsar_message_t *message,
                                                   pulsar_result_callback callback, void *ctx) {
-    consumer->consumer.acknowledgeCumulativeAsync(message->message,
-                                                  boost::bind(handle_result_callback, _1, callback, ctx));
+    consumer->consumer.acknowledgeCumulativeAsync(
+        message->message, std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
 
 void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t *consumer,
                                                      pulsar_message_id_t *messageId,
                                                      pulsar_result_callback callback, void *ctx) {
-    consumer->consumer.acknowledgeCumulativeAsync(messageId->messageId,
-                                                  boost::bind(handle_result_callback, _1, callback, ctx));
+    consumer->consumer.acknowledgeCumulativeAsync(
+        messageId->messageId, std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) {
@@ -106,7 +107,7 @@ pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) {
 }
 
 void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback, void *ctx) {
-    consumer->consumer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
+    consumer->consumer.closeAsync(std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
 
 void pulsar_consumer_free(pulsar_consumer_t *consumer) { delete consumer; }
@@ -126,7 +127,7 @@ void pulsar_consumer_redeliver_unacknowledged_messages(pulsar_consumer_t *consum
 void pulsar_consumer_seek_async(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,
                                 pulsar_result_callback callback, void *ctx) {
     consumer->consumer.seekAsync(messageId->messageId,
-                                 boost::bind(handle_result_callback, _1, callback, ctx));
+                                 std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
 
 pulsar_result pulsar_consumer_seek(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId) {
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index 75cdc47..924060d 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -52,8 +52,8 @@ static void message_listener_callback(pulsar::Consumer consumer, const pulsar::M
 void pulsar_consumer_configuration_set_message_listener(
     pulsar_consumer_configuration_t *consumer_configuration, pulsar_message_listener messageListener,
     void *ctx) {
-    consumer_configuration->consumerConfiguration.setMessageListener(
-        boost::bind(message_listener_callback, _1, _2, messageListener, ctx));
+    consumer_configuration->consumerConfiguration.setMessageListener(std::bind(
+        message_listener_callback, std::placeholders::_1, std::placeholders::_2, messageListener, ctx));
 }
 
 int pulsar_consumer_configuration_has_message_listener(
diff --git a/pulsar-client-cpp/lib/c/c_Producer.cc b/pulsar-client-cpp/lib/c/c_Producer.cc
index cb8e660..63fffe9 100644
--- a/pulsar-client-cpp/lib/c/c_Producer.cc
+++ b/pulsar-client-cpp/lib/c/c_Producer.cc
@@ -17,8 +17,6 @@
  * under the License.
  */
 
-#include <boost/bind.hpp>
-
 #include <pulsar/c/producer.h>
 
 #include "c_structs.h"
@@ -46,7 +44,8 @@ static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg, p
 void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
                                 pulsar_send_callback callback, void *ctx) {
     msg->message = msg->builder.build();
-    producer->producer.sendAsync(msg->message, boost::bind(&handle_producer_send, _1, msg, callback, ctx));
+    producer->producer.sendAsync(msg->message,
+                                 std::bind(&handle_producer_send, std::placeholders::_1, msg, callback, ctx));
 }
 
 int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) {
@@ -58,7 +57,7 @@ pulsar_result pulsar_producer_close(pulsar_producer_t *producer) {
 }
 
 void pulsar_producer_close_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) {
-    producer->producer.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
+    producer->producer.closeAsync(std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
 
 pulsar_result pulsar_producer_flush(pulsar_producer_t *producer) {
@@ -66,5 +65,5 @@ pulsar_result pulsar_producer_flush(pulsar_producer_t *producer) {
 }
 
 void pulsar_producer_flush_async(pulsar_producer_t *producer, pulsar_close_callback callback, void *ctx) {
-    producer->producer.flushAsync(boost::bind(handle_result_callback, _1, callback, ctx));
+    producer->producer.flushAsync(std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
diff --git a/pulsar-client-cpp/lib/c/c_Reader.cc b/pulsar-client-cpp/lib/c/c_Reader.cc
index 334c861..e38b8e6 100644
--- a/pulsar-client-cpp/lib/c/c_Reader.cc
+++ b/pulsar-client-cpp/lib/c/c_Reader.cc
@@ -48,7 +48,7 @@ pulsar_result pulsar_reader_read_next_with_timeout(pulsar_reader_t *reader, puls
 pulsar_result pulsar_reader_close(pulsar_reader_t *reader) { return (pulsar_result)reader->reader.close(); }
 
 void pulsar_reader_close_async(pulsar_reader_t *reader, pulsar_result_callback callback, void *ctx) {
-    reader->reader.closeAsync(boost::bind(handle_result_callback, _1, callback, ctx));
+    reader->reader.closeAsync(std::bind(handle_result_callback, std::placeholders::_1, callback, ctx));
 }
 
 void pulsar_reader_free(pulsar_reader_t *reader) { delete reader; }
diff --git a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
index 9b764f5..9d3a1a0 100644
--- a/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ReaderConfiguration.cc
@@ -42,7 +42,8 @@ static void message_listener_callback(pulsar::Reader reader, const pulsar::Messa
 
 void pulsar_reader_configuration_set_reader_listener(pulsar_reader_configuration_t *configuration,
                                                      pulsar_reader_listener listener, void *ctx) {
-    configuration->conf.setReaderListener(boost::bind(message_listener_callback, _1, _2, listener, ctx));
+    configuration->conf.setReaderListener(
+        std::bind(message_listener_callback, std::placeholders::_1, std::placeholders::_2, listener, ctx));
 }
 
 int pulsar_reader_configuration_has_reader_listener(pulsar_reader_configuration_t *configuration) {
diff --git a/pulsar-client-cpp/lib/c/c_structs.h b/pulsar-client-cpp/lib/c/c_structs.h
index 6ff115b..a1fe285 100644
--- a/pulsar-client-cpp/lib/c/c_structs.h
+++ b/pulsar-client-cpp/lib/c/c_structs.h
@@ -22,7 +22,7 @@
 #include <pulsar/Client.h>
 
 #include <memory>
-#include <boost/bind.hpp>
+#include <functional>
 
 struct _pulsar_client {
     std::unique_ptr<pulsar::Client> client;
diff --git a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc
index 2bb5e63..b3ef0d2 100644
--- a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc
+++ b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc
@@ -20,6 +20,8 @@
 #include <lib/stats/ConsumerStatsImpl.h>
 #include <lib/LogUtils.h>
 
+#include <functional>
+
 namespace pulsar {
 DECLARE_LOG_OBJECT();
 
@@ -31,8 +33,7 @@ ConsumerStatsImpl::ConsumerStatsImpl(std::string consumerStr, DeadlineTimerPtr t
       totalNumBytesRecieved_(0),
       numBytesRecieved_(0) {
     timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
-    timer_->async_wait(
-        boost::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, boost::asio::placeholders::error));
+    timer_->async_wait(std::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, std::placeholders::_1));
 }
 
 ConsumerStatsImpl::ConsumerStatsImpl(const ConsumerStatsImpl& stats)
@@ -59,8 +60,7 @@ void ConsumerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
     lock.unlock();
 
     timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
-    timer_->async_wait(
-        boost::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, boost::asio::placeholders::error));
+    timer_->async_wait(std::bind(&pulsar::ConsumerStatsImpl::flushAndReset, this, std::placeholders::_1));
     LOG_INFO(tmp);
 }
 
diff --git a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
index 4c8e8f6..9195980 100644
--- a/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
@@ -22,7 +22,6 @@
 
 #include <lib/stats/ConsumerStatsBase.h>
 #include <lib/ExecutorService.h>
-#include <boost/bind.hpp>
 #include <lib/Utils.h>
 #include <utility>
 namespace pulsar {
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
index daf9bf8..7cc1c35 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
@@ -52,8 +52,7 @@ ProducerStatsImpl::ProducerStatsImpl(std::string producerStr, DeadlineTimerPtr t
       latencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs),
       totalLatencyAccumulator_(boost::accumulators::tag::extended_p_square::probabilities = probs) {
     timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
-    timer_->async_wait(
-        boost::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, boost::asio::placeholders::error));
+    timer_->async_wait(std::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, std::placeholders::_1));
 }
 
 ProducerStatsImpl::ProducerStatsImpl(const ProducerStatsImpl& stats)
@@ -86,8 +85,7 @@ void ProducerStatsImpl::flushAndReset(const boost::system::error_code& ec) {
     lock.unlock();
 
     timer_->expires_from_now(boost::posix_time::seconds(statsIntervalInSeconds_));
-    timer_->async_wait(
-        boost::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, boost::asio::placeholders::error));
+    timer_->async_wait(std::bind(&pulsar::ProducerStatsImpl::flushAndReset, this, std::placeholders::_1));
     LOG_INFO(tmp);
 }
 
diff --git a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
index 2212809..2ef1abc 100644
--- a/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
+++ b/pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
@@ -41,7 +41,6 @@
 #include <iostream>
 #include <vector>
 #include <lib/Utils.h>
-#include <boost/bind.hpp>
 #include <lib/stats/ProducerStatsBase.h>
 
 namespace pulsar {
diff --git a/pulsar-client-cpp/perf/PerfConsumer.cc b/pulsar-client-cpp/perf/PerfConsumer.cc
index db316ad..4413edc 100644
--- a/pulsar-client-cpp/perf/PerfConsumer.cc
+++ b/pulsar-client-cpp/perf/PerfConsumer.cc
@@ -24,11 +24,11 @@ DECLARE_LOG_OBJECT()
 #include <iostream>
 #include <fstream>
 #include <mutex>
+#include <functional>
 
 using namespace std::chrono;
 
 #include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/bind.hpp>
 #include <boost/filesystem.hpp>
 #include <boost/program_options.hpp>
 #include <boost/accumulators/accumulators.hpp>
@@ -205,7 +205,7 @@ void startPerfConsumer(const Arguments& args) {
             }
 
             client.subscribeAsync(topic, subscriberName, consumerConf,
-                                  boost::bind(handleSubscribe, _1, _2, latch));
+                                  std::bind(handleSubscribe, std::placeholders::_1, std::placeholders::_2, latch));
         }
     }
 
diff --git a/pulsar-client-cpp/perf/PerfProducer.cc b/pulsar-client-cpp/perf/PerfProducer.cc
index e36728b..4e8a434 100644
--- a/pulsar-client-cpp/perf/PerfProducer.cc
+++ b/pulsar-client-cpp/perf/PerfProducer.cc
@@ -20,8 +20,6 @@
 DECLARE_LOG_OBJECT()
 
 #include <mutex>
-
-#include <boost/bind.hpp>
 #include <boost/filesystem.hpp>
 
 #include <boost/accumulators/accumulators.hpp>
@@ -31,6 +29,7 @@ DECLARE_LOG_OBJECT()
 #include <boost/program_options/variables_map.hpp>
 #include <boost/program_options.hpp>
 #include <thread>
+#include <functional>
 namespace po = boost::program_options;
 
 #include <atomic>
@@ -152,7 +151,7 @@ void runProducer(const Arguments& args, std::string topicName, int threadIndex,
         }
         pulsar::Message msg = builder.create().setAllocatedContent(payload.get(), args.msgSize).build();
 
-        producer.sendAsync(msg, boost::bind(sendCallback, _1, _2, Clock::now()));
+        producer.sendAsync(msg, std::bind(sendCallback, std::placeholders::_1, std::placeholders::_2, Clock::now()));
         if (exitCondition) {
             LOG_INFO("Thread interrupted. Exiting producer thread.");
             break;
@@ -185,7 +184,7 @@ void startPerfProducer(const Arguments& args, pulsar::ProducerConfiguration &pro
 
             for (int k = 0; k < args.numOfThreadsPerProducer; k++) {
                 threadList.push_back(std::thread(
-                        boost::bind(runProducer, args, topic, k, limiter, producerList[i * args.numProducers + j],
+                        std::bind(runProducer, args, topic, k, limiter, producerList[i * args.numProducers + j],
                                     std::cref(exitCondition))));
             }
         }
diff --git a/pulsar-client-cpp/python/src/producer.cc b/pulsar-client-cpp/python/src/producer.cc
index 76e5180..88f2cc0 100644
--- a/pulsar-client-cpp/python/src/producer.cc
+++ b/pulsar-client-cpp/python/src/producer.cc
@@ -18,6 +18,8 @@
  */
 #include "utils.h"
 
+#include <functional>
+
 void Producer_send(Producer& producer, const Message& message) {
     Result res;
     Py_BEGIN_ALLOW_THREADS
@@ -49,7 +51,8 @@ void Producer_sendAsync(Producer& producer, const Message& message, py::object c
     Py_XINCREF(pyCallback);
 
     Py_BEGIN_ALLOW_THREADS
-    producer.sendAsync(message, boost::bind(Producer_sendAsyncCallback, pyCallback, _1, _2));
+    producer.sendAsync(message, std::bind(Producer_sendAsyncCallback, pyCallback,
+            std::placeholders::_1, std::placeholders::_2));
     Py_END_ALLOW_THREADS
 }
 
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 4f04cb7..fdab383 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -35,6 +35,9 @@
 #include <lib/PatternMultiTopicsConsumerImpl.h>
 #include "lib/Future.h"
 #include "lib/Utils.h"
+
+#include <functional>
+
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
@@ -86,8 +89,8 @@ static void receiveCallBack(Result r, const Message& msg, std::string& messageCo
     receiveMutex_.unlock();
 }
 
-static void sendCallBack(Result r, const Message& msg, std::string prefix, double percentage,
-                         uint64_t delayInMicros, int* count) {
+static void sendCallBackWithDelay(Result r, const Message& msg, std::string prefix, double percentage,
+                                  uint64_t delayInMicros, int* count) {
     if ((rand() % 100) <= percentage) {
         usleep(delayInMicros);
     }
@@ -183,7 +186,8 @@ TEST(BasicEndToEndTest, testBatchMessages) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
+        producer.sendAsync(
+            msg, std::bind(&sendCallBack, std::placeholders::_1, std::placeholders::_2, prefix, &msgCount));
         LOG_DEBUG("sending message " << messageContent);
     }
 
@@ -213,7 +217,8 @@ void resendMessage(Result r, const Message msg, Producer producer) {
         }
     }
     lock.unlock();
-    producer.sendAsync(MessageBuilder().build(), boost::bind(resendMessage, _1, _2, producer));
+    producer.sendAsync(MessageBuilder().build(),
+                       std::bind(resendMessage, std::placeholders::_1, std::placeholders::_2, producer));
 }
 
 TEST(BasicEndToEndTest, testProduceConsume) {
@@ -855,7 +860,8 @@ TEST(BasicEndToEndTest, testMessageListener) {
     globalCount = 0;
 
     ConsumerConfiguration consumerConfig;
-    consumerConfig.setMessageListener(boost::bind(messageListenerFunction, _1, _2));
+    consumerConfig.setMessageListener(
+        std::bind(messageListenerFunction, std::placeholders::_1, std::placeholders::_2));
     Consumer consumer;
     result = client.subscribe(topicName, "subscription-A", consumerConfig, consumer);
 
@@ -898,7 +904,8 @@ TEST(BasicEndToEndTest, testMessageListenerPause) {
     globalCount = 0;
 
     ConsumerConfiguration consumerConfig;
-    consumerConfig.setMessageListener(boost::bind(messageListenerFunction, _1, _2));
+    consumerConfig.setMessageListener(
+        std::bind(messageListenerFunction, std::placeholders::_1, std::placeholders::_2));
     Consumer consumer;
     // Removing dangling subscription from previous test failures
     result = client.subscribe(topicName, "subscription-name", consumerConfig, consumer);
@@ -955,7 +962,8 @@ TEST(BasicEndToEndTest, testResendViaSendCallback) {
     // Expect timeouts since we have set timeout to 1 ms
     // On receiving timeout send the message using the Pulsar client IO thread via cb function.
     for (int i = 0; i < 10000; i++) {
-        producer.sendAsync(MessageBuilder().build(), boost::bind(resendMessage, _1, _2, producer));
+        producer.sendAsync(MessageBuilder().build(),
+                           std::bind(resendMessage, std::placeholders::_1, std::placeholders::_2, producer));
     }
     // 3 seconds
     usleep(3 * 1000 * 1000);
@@ -1003,7 +1011,8 @@ TEST(BasicEndToEndTest, testStatsLatencies) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, 15, 2 * 1e3, &count));
+        producer.sendAsync(msg, std::bind(&sendCallBackWithDelay, std::placeholders::_1,
+                                          std::placeholders::_2, prefix, 15, 2 * 1e3, &count));
         LOG_DEBUG("sending message " << messageContent);
     }
 
@@ -1513,7 +1522,8 @@ TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
     ConsumerConfiguration consConfig;
     consConfig.setUnAckedMessagesTimeoutMs(10 * 1000);
     Latch latch(2);
-    consConfig.setMessageListener(boost::bind(messageListenerFunctionWithoutAck, _1, _2, latch, content));
+    consConfig.setMessageListener(std::bind(messageListenerFunctionWithoutAck, std::placeholders::_1,
+                                            std::placeholders::_2, latch, content));
     result = client.subscribe(topicName, subName, consConfig, consumer);
     ASSERT_EQ(ResultOk, result);
 
@@ -2115,7 +2125,8 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
+        producer.sendAsync(
+            msg, std::bind(&sendCallBack, std::placeholders::_1, std::placeholders::_2, prefix, &msgCount));
         LOG_DEBUG("async sending message " << messageContent);
     }
     LOG_INFO("sending first half messages in async, should timeout to receive");
@@ -2129,7 +2140,8 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
+        producer.sendAsync(
+            msg, std::bind(&sendCallBack, std::placeholders::_1, std::placeholders::_2, prefix, &msgCount));
         LOG_DEBUG("async sending message " << messageContent);
     }
     LOG_INFO("sending the other half messages in async, should able to receive");
@@ -2339,7 +2351,8 @@ TEST(BasicEndToEndTest, testFlushInProducer) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
+        producer.sendAsync(
+            msg, std::bind(&sendCallBack, std::placeholders::_1, std::placeholders::_2, prefix, &msgCount));
         LOG_DEBUG("async sending message " << messageContent);
     }
     LOG_INFO("sending half of messages in async, should timeout to receive");
@@ -2362,7 +2375,8 @@ TEST(BasicEndToEndTest, testFlushInProducer) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
+        producer.sendAsync(
+            msg, std::bind(&sendCallBack, std::placeholders::_1, std::placeholders::_2, prefix, &msgCount));
         LOG_DEBUG("async sending message " << messageContent);
     }
     LOG_INFO(
@@ -2510,7 +2524,8 @@ TEST(BasicEndToEndTest, testReceiveAsync) {
     int totalMsgs = 5;
     bool isFailed = false;
     for (int i = 0; i < totalMsgs; i++) {
-        consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, true, &isFailed, &count));
+        consumer.receiveAsync(std::bind(&receiveCallBack, std::placeholders::_1, std::placeholders::_2,
+                                        content, true, &isFailed, &count));
     }
     // Send synchronously
     for (int i = 0; i < totalMsgs; i++) {
@@ -2555,7 +2570,8 @@ TEST(BasicEndToEndTest, testPartitionedReceiveAsync) {
     int count = 0;
     bool isFailed = false;
     for (int i = 0; i < totalMsgs; i++) {
-        consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, false, &isFailed, &count));
+        consumer.receiveAsync(std::bind(&receiveCallBack, std::placeholders::_1, std::placeholders::_2,
+                                        content, false, &isFailed, &count));
     }
 
     for (int i = 0; i < totalMsgs; i++) {
@@ -2625,7 +2641,8 @@ TEST(BasicEndToEndTest, testBatchMessagesReceiveAsync) {
     int count = 0;
     bool isFailed = false;
     for (int i = 0; i < numOfMessages; i++) {
-        consumer.receiveAsync(boost::bind(&receiveCallBack, _1, _2, content, false, &isFailed, &count));
+        consumer.receiveAsync(std::bind(&receiveCallBack, std::placeholders::_1, std::placeholders::_2,
+                                        content, false, &isFailed, &count));
     }
 
     // Send Asynchronously
@@ -2635,7 +2652,8 @@ TEST(BasicEndToEndTest, testBatchMessagesReceiveAsync) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
-        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix, &msgCount));
+        producer.sendAsync(
+            msg, std::bind(&sendCallBack, std::placeholders::_1, std::placeholders::_2, prefix, &msgCount));
         LOG_DEBUG("sending message " << messageContent);
     }
 
@@ -2667,16 +2685,16 @@ TEST(BasicEndToEndTest, testReceiveAsyncFailedConsumer) {
     std::string content;
     int closingCunt = 0;
     // callback should immediately fail
-    consumer.receiveAsync(
-        boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosing, &closingCunt));
+    consumer.receiveAsync(std::bind(&receiveCallBack, std::placeholders::_1, std::placeholders::_2, content,
+                                    false, &isFailedOnConsumerClosing, &closingCunt));
 
     // close consumer
     consumer.close();
     bool isFailedOnConsumerClosed = false;
     int count = 0;
     // callback should immediately fail
-    consumer.receiveAsync(
-        boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosed, &count));
+    consumer.receiveAsync(std::bind(&receiveCallBack, std::placeholders::_1, std::placeholders::_2, content,
+                                    false, &isFailedOnConsumerClosed, &count));
 
     // check strategically
     for (int i = 0; i < 3; i++) {
@@ -2712,15 +2730,15 @@ TEST(BasicEndToEndTest, testPartitionedReceiveAsyncFailedConsumer) {
     std::string content;
     int closingCunt = 0;
     // callback should immediately fail
-    consumer.receiveAsync(
-        boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosing, &closingCunt));
+    consumer.receiveAsync(std::bind(&receiveCallBack, std::placeholders::_1, std::placeholders::_2, content,
+                                    false, &isFailedOnConsumerClosing, &closingCunt));
     // close consumer
     consumer.close();
 
     int count = 0;
     bool isFailedOnConsumerClosed = false;
-    consumer.receiveAsync(
-        boost::bind(&receiveCallBack, _1, _2, content, false, &isFailedOnConsumerClosed, &count));
+    consumer.receiveAsync(std::bind(&receiveCallBack, std::placeholders::_1, std::placeholders::_2, content,
+                                    false, &isFailedOnConsumerClosed, &count));
 
     // check strategically
     for (int i = 0; i < 3; i++) {
diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
index 133f94c..e4ff1b0 100644
--- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
@@ -29,6 +29,8 @@
 #include <lib/Latch.h>
 #include <lib/PartitionedConsumerImpl.h>
 #include <lib/TopicName.h>
+
+#include <functional>
 DECLARE_LOG_OBJECT();
 
 using namespace pulsar;
@@ -89,8 +91,9 @@ TEST(ConsumerStatsTest, testBacklogInfo) {
     }
 
     LOG_DEBUG("Calling consumer.getBrokerConsumerStats");
-    consumer.getBrokerConsumerStatsAsync(
-        boost::bind(simpleCallbackFunction, _1, _2, ResultOk, numOfMessages, ConsumerExclusive));
+    consumer.getBrokerConsumerStatsAsync(std::bind(simpleCallbackFunction, std::placeholders::_1,
+                                                   std::placeholders::_2, ResultOk, numOfMessages,
+                                                   ConsumerExclusive));
 
     for (int i = numOfMessages; i < (numOfMessages * 2); i++) {
         std::string messageContent = prefix + std::to_string(i);
@@ -279,7 +282,8 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
 
     // Expecting return from 4 callbacks
     Latch latch(4);
-    consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 5, latch, 0));
+    consumer.getBrokerConsumerStatsAsync(
+        std::bind(partitionedCallbackFunction, std::placeholders::_1, std::placeholders::_2, 5, latch, 0));
 
     // Now we have 10 messages per partition
     for (int i = numOfMessages; i < (numOfMessages * 2); i++) {
@@ -289,11 +293,13 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
     }
 
     // Expecting cached result
-    consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 5, latch, 0));
+    consumer.getBrokerConsumerStatsAsync(
+        std::bind(partitionedCallbackFunction, std::placeholders::_1, std::placeholders::_2, 5, latch, 0));
 
     usleep(4.5 * 1000 * 1000);
     // Expecting fresh results
-    consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 10, latch, 2));
+    consumer.getBrokerConsumerStatsAsync(
+        std::bind(partitionedCallbackFunction, std::placeholders::_1, std::placeholders::_2, 10, latch, 2));
 
     Message msg;
     while (consumer.receive(msg)) {
@@ -301,7 +307,8 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
     }
 
     // Expecting the backlog to be the same since we didn't acknowledge the messages
-    consumer.getBrokerConsumerStatsAsync(boost::bind(partitionedCallbackFunction, _1, _2, 10, latch, 3));
+    consumer.getBrokerConsumerStatsAsync(
+        std::bind(partitionedCallbackFunction, std::placeholders::_1, std::placeholders::_2, 10, latch, 3));
 
     // Wait for ten seconds only
     ASSERT_TRUE(latch.wait(std::chrono::seconds(10)));
diff --git a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
index 3b22a30..53b6c9d 100644
--- a/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
+++ b/pulsar-client-cpp/tests/ZeroQueueSizeTest.cc
@@ -20,6 +20,8 @@
 #include <pulsar/Client.h>
 #include <lib/Latch.h>
 #include "ConsumerTest.h"
+#include <functional>
+
 DECLARE_LOG_OBJECT()
 
 using namespace pulsar;
@@ -91,7 +93,8 @@ TEST(ZeroQueueSizeTest, testMessageListener) {
     ConsumerConfiguration consConfig;
     consConfig.setReceiverQueueSize(0);
     Latch latch(totalMessages);
-    consConfig.setMessageListener(boost::bind(messageListenerFunction, _1, _2, latch));
+    consConfig.setMessageListener(
+        std::bind(messageListenerFunction, std::placeholders::_1, std::placeholders::_2, latch));
     result = client.subscribe(topicName, subName, consConfig, consumer);
     ASSERT_EQ(ResultOk, result);
 


Mime
View raw message