pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Negative acks in C++ client (#3750)
Date Wed, 13 Mar 2019 23:50:58 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d44850  Negative acks in C++ client (#3750)
2d44850 is described below

commit 2d44850ae8721f92432a6cb36ce6f2e482303045
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Wed Mar 13 16:50:53 2019 -0700

    Negative acks in C++ client (#3750)
    
    * Negative acks in C++ client
    
    * Fixed formatting
    
    * Fixed api docs
---
 pulsar-client-cpp/include/pulsar/Consumer.h        |  64 +++++++++++++
 .../include/pulsar/ConsumerConfiguration.h         |  21 ++++
 pulsar-client-cpp/include/pulsar/MessageId.h       |   2 +-
 pulsar-client-cpp/include/pulsar/c/consumer.h      |  28 ++++++
 .../include/pulsar/c/consumer_configuration.h      |  24 +++++
 pulsar-client-cpp/lib/Commands.cc                  |   8 +-
 pulsar-client-cpp/lib/Commands.h                   |   5 +-
 pulsar-client-cpp/lib/Consumer.cc                  |   9 ++
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |   8 ++
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |   4 +
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  16 +++-
 pulsar-client-cpp/lib/ConsumerImpl.h               |   7 ++
 pulsar-client-cpp/lib/ConsumerImplBase.h           |   3 +
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   |   9 ++
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   2 +
 pulsar-client-cpp/lib/NegativeAcksTracker.cc       |  98 +++++++++++++++++++
 pulsar-client-cpp/lib/NegativeAcksTracker.h        |  62 ++++++++++++
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |   6 ++
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |   2 +
 pulsar-client-cpp/lib/c/c_Consumer.cc              |   8 ++
 pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc |   9 ++
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 106 +++++++++++++++++----
 22 files changed, 475 insertions(+), 26 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h
index d982833..7dee15f 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -171,6 +171,70 @@ class Consumer {
     void acknowledgeCumulativeAsync(const Message& message, ResultCallback callback);
     void acknowledgeCumulativeAsync(const MessageId& messageId, ResultCallback callback);
 
+    /**
+     * Acknowledge the failure to process a single message.
+     * <p>
+     * When a message is "negatively acked" it will be marked for redelivery after
+     * some fixed delay. The delay is configurable when constructing the consumer
+     * with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+     * <p>
+     * This call is not blocking.
+     *
+     * <p>
+     * Example of usage:
+     * <pre><code>
+     * while (true) {
+     *     Message msg;
+     *     consumer.receive(msg);
+     *
+     *     try {
+     *          // Process message...
+     *
+     *          consumer.acknowledge(msg);
+     *     } catch (Throwable t) {
+     *          log.warn("Failed to process message");
+     *          consumer.negativeAcknowledge(msg);
+     *     }
+     * }
+     * </code></pre>
+     *
+     * @param message
+     *            The {@code Message} to be acknowledged
+     */
+    void negativeAcknowledge(const Message& message);
+
+    /**
+     * Acknowledge the failure to process a single message.
+     * <p>
+     * When a message is "negatively acked" it will be marked for redelivery after
+     * some fixed delay. The delay is configurable when constructing the consumer
+     * with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+     * <p>
+     * This call is not blocking.
+     *
+     * <p>
+     * Example of usage:
+     * <pre><code>
+     * while (true) {
+     *     Message msg;
+     *     consumer.receive(msg);
+     *
+     *     try {
+     *          // Process message...
+     *
+     *          consumer.acknowledge(msg);
+     *     } catch (Throwable t) {
+     *          log.warn("Failed to process message");
+     *          consumer.negativeAcknowledge(msg);
+     *     }
+     * }
+     * </code></pre>
+     *
+     * @param messageId
+     *            The {@code MessageId} to be acknowledged
+     */
+    void negativeAcknowledge(const MessageId& messageId);
+
     Result close();
 
     void closeAsync(ResultCallback callback);
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index e30ab80..362e178 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -150,6 +150,27 @@ class ConsumerConfiguration {
     long getUnAckedMessagesTimeoutMs() const;
 
     /**
+     * Set the delay to wait before re-delivering messages that have failed to be process.
+     * <p>
+     * When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
+     * will be redelivered after a fixed timeout. The default is 1 min.
+     *
+     * @param redeliveryDelay
+     *            redelivery delay for failed messages
+     * @param timeUnit
+     *            unit in which the timeout is provided.
+     * @return the consumer builder instance
+     */
+    void setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis);
+
+    /**
+     * Get the configured delay to wait before re-delivering messages that have failed to
be process.
+     *
+     * @return redelivery delay for failed messages
+     */
+    long getNegativeAckRedeliveryDelayMs() const;
+
+    /**
      * Set the time duration for which the broker side consumer stats will be cached in the
client.
      * @param cacheTimeInMs in milliseconds
      */
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h
index e25444b..a8a9276 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -22,7 +22,6 @@
 #include <iosfwd>
 #include <stdint.h>
 #include <memory>
-//#include <lib/MessageIdImpl.h>
 
 #pragma GCC visibility push(default)
 
@@ -87,6 +86,7 @@ class MessageId {
     friend class BatchAcknowledgementTracker;
     friend class PulsarWrapper;
     friend class PulsarFriend;
+    friend class NegativeAcksTracker;
 
     friend std::ostream& operator<<(std::ostream& s, const MessageId& messageId);
 
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer.h b/pulsar-client-cpp/include/pulsar/c/consumer.h
index f350ee0..f84a822 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer.h
@@ -164,6 +164,34 @@ void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t
*consumer
                                                      pulsar_message_id_t *messageId,
                                                      pulsar_result_callback callback, void
*ctx);
 
+/**
+ * Acknowledge the failure to process a single message.
+ * <p>
+ * When a message is "negatively acked" it will be marked for redelivery after
+ * some fixed delay. The delay is configurable when constructing the consumer
+ * with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+ * <p>
+ * This call is not blocking.
+ *
+ * @param message
+ *            The {@code Message} to be acknowledged
+ */
+void pulsar_consumer_negative_acknowledge(pulsar_consumer_t *consumer, pulsar_message_t *message);
+
+/**
+ * Acknowledge the failure to process a single message through its message id
+ * <p>
+ * When a message is "negatively acked" it will be marked for redelivery after
+ * some fixed delay. The delay is configurable when constructing the consumer
+ * with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+ * <p>
+ * This call is not blocking.
+ *
+ * @param message
+ *            The message id to be acknowledged
+ */
+void pulsar_consumer_negative_acknowledge_id(pulsar_consumer_t *consumer, pulsar_message_id_t
*messageId);
+
 pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer);
 
 void pulsar_consumer_close_async(pulsar_consumer_t *consumer, pulsar_result_callback callback,
void *ctx);
diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
index b90963a..810d062 100644
--- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
@@ -158,6 +158,30 @@ void pulsar_consumer_set_unacked_messages_timeout_ms(pulsar_consumer_configurati
  */
 long pulsar_consumer_get_unacked_messages_timeout_ms(pulsar_consumer_configuration_t *consumer_configuration);
 
+/**
+ * Set the delay to wait before re-delivering messages that have failed to be process.
+ * <p>
+ * When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
+ * will be redelivered after a fixed timeout. The default is 1 min.
+ *
+ * @param redeliveryDelay
+ *            redelivery delay for failed messages
+ * @param timeUnit
+ *            unit in which the timeout is provided.
+ * @return the consumer builder instance
+ */
+void pulsar_configure_set_negative_ack_redelivery_delay_ms(
+    pulsar_consumer_configuration_t *consumer_configuration, long redeliveryDelayMillis);
+
+/**
+ * Get the configured delay to wait before re-delivering messages that have failed to be
process.
+ *
+ * @param consumer_configuration the consumer conf object
+ * @return redelivery delay for failed messages
+ */
+long pulsar_configure_get_negative_ack_redelivery_delay_ms(
+    pulsar_consumer_configuration_t *consumer_configuration);
+
 int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t *consumer_configuration);
 
 int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_configuration);
diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc
index 06b96f9..6c5a63b 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -366,11 +366,17 @@ SharedBuffer Commands::newPong() {
     return writeMessageWithSize(cmd);
 }
 
-SharedBuffer Commands::newRedeliverUnacknowledgedMessages(uint64_t consumerId) {
+SharedBuffer Commands::newRedeliverUnacknowledgedMessages(uint64_t consumerId,
+                                                          const std::set<MessageId>&
messageIds) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::REDELIVER_UNACKNOWLEDGED_MESSAGES);
     CommandRedeliverUnacknowledgedMessages* command = cmd.mutable_redeliverunacknowledgedmessages();
     command->set_consumer_id(consumerId);
+    for (const auto& msgId : messageIds) {
+        MessageIdData* msgIdData = command->add_message_ids();
+        msgIdData->set_ledgerid(msgId.ledgerId());
+        msgIdData->set_entryid(msgId.entryId());
+    }
     return writeMessageWithSize(cmd);
 }
 
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 967270c..4952089 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -27,6 +27,8 @@
 #include "SharedBuffer.h"
 #include "Utils.h"
 
+#include <set>
+
 using namespace pulsar;
 
 namespace pulsar {
@@ -102,7 +104,8 @@ class Commands {
     static SharedBuffer newPing();
     static SharedBuffer newPong();
 
-    static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId);
+    static SharedBuffer newRedeliverUnacknowledgedMessages(uint64_t consumerId,
+                                                           const std::set<MessageId>&
messageIds);
 
     static std::string messageType(proto::BaseCommand::Type type);
 
diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc
index cbe44fe..a968a91 100644
--- a/pulsar-client-cpp/lib/Consumer.cc
+++ b/pulsar-client-cpp/lib/Consumer.cc
@@ -142,6 +142,15 @@ void Consumer::acknowledgeCumulativeAsync(const MessageId& messageId,
ResultCall
     impl_->acknowledgeCumulativeAsync(messageId, callback);
 }
 
+void Consumer::negativeAcknowledge(const Message& message) { negativeAcknowledge(message.getMessageId());
}
+
+void Consumer::negativeAcknowledge(const MessageId& messageId) {
+    if (impl_) {
+        impl_->negativeAcknowledge(messageId);
+        ;
+    }
+}
+
 Result Consumer::close() {
     Promise<bool, Result> promise;
     closeAsync(WaitForCallback(promise));
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 4530ebd..1d00420 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -92,6 +92,14 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t
milliSeco
     impl_->unAckedMessagesTimeoutMs = milliSeconds;
 }
 
+void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
+    impl_->negativeAckRedeliveryDelay = std::chrono::milliseconds(redeliveryDelayMillis);
+}
+
+long ConsumerConfiguration::getNegativeAckRedeliveryDelayMs() const {
+    return impl_->negativeAckRedeliveryDelay.count();
+}
+
 bool ConsumerConfiguration::isEncryptionEnabled() const { return (impl_->cryptoKeyReader
!= NULL); }
 
 const CryptoKeyReaderPtr ConsumerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader;
}
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 70f725b..30285ff 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -21,10 +21,14 @@
 
 #include <pulsar/ConsumerConfiguration.h>
 
+#include <chrono>
+
 namespace pulsar {
 struct ConsumerConfigurationImpl {
     SchemaInfo schemaInfo;
     long unAckedMessagesTimeoutMs;
+
+    std::chrono::milliseconds negativeAckRedeliveryDelay;
     ConsumerType consumerType;
     MessageListener messageListener;
     bool hasMessageListener;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 7571d66..980d9f3 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -57,6 +57,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string&
topic,
       batchAcknowledgementTracker_(topic_, subscription, (long)consumerId_),
       brokerConsumerStats_(),
       consumerStatsBasePtr_(),
+      negativeAcksTracker_(client, *this, conf),
       msgCrypto_(),
       readCompacted_(conf.isReadCompacted()),
       lastMessageInBroker_(Optional<MessageId>::of(MessageId())) {
@@ -793,6 +794,11 @@ void ConsumerImpl::doAcknowledge(const MessageId& messageId, proto::CommandAck_A
     }
 }
 
+void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
+    unAckedMessageTrackerPtr_->remove(messageId);
+    negativeAcksTracker_.add(messageId);
+}
+
 void ConsumerImpl::disconnectConsumer() {
     LOG_INFO("Broker notification of Closed consumer: " << consumerId_);
     Lock lock(mutex_);
@@ -918,14 +924,16 @@ Result ConsumerImpl::resumeMessageListener() {
 }
 
 void ConsumerImpl::redeliverUnacknowledgedMessages() {
+    static std::set<MessageId> emptySet;
+    redeliverMessages(emptySet);
+}
+
+void ConsumerImpl::redeliverMessages(const std::set<MessageId>& messageIds) {
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         if (cnx->getServerProtocolVersion() >= proto::v2) {
-            cnx->sendCommand(Commands::newRedeliverUnacknowledgedMessages(consumerId_));
+            cnx->sendCommand(Commands::newRedeliverUnacknowledgedMessages(consumerId_,
messageIds));
             LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for Consumer - " <<
getConsumerId());
-        } else {
-            LOG_DEBUG("Reconnecting the client to redeliver the messages for Consumer - "
<< getName());
-            cnx->close();
         }
     } else {
         LOG_DEBUG("Connection not ready for Consumer - " << getConsumerId());
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index b5fe761..b917f790 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -26,6 +26,7 @@
 #include "HandlerBase.h"
 #include "ClientConnection.h"
 #include "lib/UnAckedMessageTrackerEnabled.h"
+#include "NegativeAcksTracker.h"
 #include "Commands.h"
 #include "ExecutorService.h"
 #include "ConsumerImplBase.h"
@@ -92,7 +93,12 @@ class ConsumerImpl : public ConsumerImplBase,
     virtual void receiveAsync(ReceiveCallback& callback);
     Result fetchSingleMessageFromBroker(Message& msg);
     virtual void acknowledgeAsync(const MessageId& msgId, ResultCallback callback);
+
     virtual void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback);
+
+    virtual void redeliverMessages(const std::set<MessageId>& messageIds);
+    virtual void negativeAcknowledge(const MessageId& msgId);
+
     virtual void closeAsync(ResultCallback callback);
     virtual void start();
     virtual void shutdown();
@@ -169,6 +175,7 @@ class ConsumerImpl : public ConsumerImplBase,
     UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
+    NegativeAcksTracker negativeAcksTracker_;
 
     MessageCryptoPtr msgCrypto_;
     const bool readCompacted_;
diff --git a/pulsar-client-cpp/lib/ConsumerImplBase.h b/pulsar-client-cpp/lib/ConsumerImplBase.h
index 766b0a9..c716270 100644
--- a/pulsar-client-cpp/lib/ConsumerImplBase.h
+++ b/pulsar-client-cpp/lib/ConsumerImplBase.h
@@ -21,6 +21,8 @@
 #include <pulsar/Message.h>
 #include <pulsar/Consumer.h>
 
+#include <set>
+
 namespace pulsar {
 class ConsumerImplBase;
 
@@ -50,6 +52,7 @@ class ConsumerImplBase {
     virtual int getNumOfPrefetchedMessages() const = 0;
     virtual void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) = 0;
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback) = 0;
+    virtual void negativeAcknowledge(const MessageId& msgId) = 0;
 };
 }  // namespace pulsar
 #endif  // PULSAR_CONSUMER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index 963c20f..2c01da5 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -563,6 +563,15 @@ void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId&
msgId,
     callback(ResultOperationNotSupported);
 }
 
+void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
+    auto iterator = consumers_.find(msgId.getTopicName());
+
+    if (consumers_.end() != iterator) {
+        unAckedMessageTrackerPtr_->remove(msgId);
+        iterator->second->negativeAcknowledge(msgId);
+    }
+}
+
 MultiTopicsConsumerImpl::~MultiTopicsConsumerImpl() {}
 
 Future<Result, ConsumerImplBaseWeakPtr> MultiTopicsConsumerImpl::getConsumerCreatedFuture()
{
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 74350b9..757c150 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -80,6 +80,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     // not supported
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback);
 
+    virtual void negativeAcknowledge(const MessageId& msgId);
+
    protected:
     const ClientImplPtr client_;
     const std::string subscriptionName_;
diff --git a/pulsar-client-cpp/lib/NegativeAcksTracker.cc b/pulsar-client-cpp/lib/NegativeAcksTracker.cc
new file mode 100644
index 0000000..66afd1b
--- /dev/null
+++ b/pulsar-client-cpp/lib/NegativeAcksTracker.cc
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "NegativeAcksTracker.h"
+
+#include "ConsumerImpl.h"
+
+#include <set>
+#include <functional>
+
+namespace pulsar {
+const std::chrono::milliseconds NegativeAcksTracker::MIN_NACK_DELAY_NANOS = std::chrono::milliseconds(100);
+
+NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer,
+                                         const ConsumerConfiguration &conf)
+    : consumer_(consumer),
+      nackDelay_(
+          std::max(std::chrono::milliseconds(conf.getNegativeAckRedeliveryDelayMs()), MIN_NACK_DELAY_NANOS)),
+      timerInterval_((long)(nackDelay_.count() / 3)),
+      executor_(client->getIOExecutorProvider()->get()) {}
+
+void NegativeAcksTracker::scheduleTimer() {
+    timer_ = executor_->createDeadlineTimer();
+    timer_->expires_from_now(timerInterval_);
+    timer_->async_wait(std::bind(&NegativeAcksTracker::handleTimer, this, std::placeholders::_1));
+}
+
+void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    timer_ = nullptr;
+
+    if (ec) {
+        // Ignore cancelled events
+        return;
+    }
+
+    if (nackedMessages_.empty()) {
+        return;
+    }
+
+    // Group all the nacked messages into one single re-delivery request
+    std::set<MessageId> messagesToRedeliver;
+
+    auto now = Clock::now();
+
+    for (auto it = nackedMessages_.begin(); it != nackedMessages_.end();) {
+        if (it->second < now) {
+            messagesToRedeliver.insert(it->first);
+            it = nackedMessages_.erase(it);
+        } else {
+            ++it;
+        }
+    }
+
+    consumer_.redeliverMessages(messagesToRedeliver);
+    scheduleTimer();
+}
+
+void NegativeAcksTracker::add(const MessageId &m) {
+    std::lock_guard<std::mutex> lock(mutex_);
+
+    auto now = Clock::now();
+
+    // Erase batch id to group all nacks from same batch
+    MessageId batchMessageId = MessageId(m.partition(), m.ledgerId(), m.entryId(), -1);
+    nackedMessages_[batchMessageId] = now + nackDelay_;
+
+    if (!timer_) {
+        scheduleTimer();
+    }
+}
+
+void NegativeAcksTracker::close() {
+    std::lock_guard<std::mutex> lock(mutex_);
+
+    if (timer_) {
+        boost::system::error_code ec;
+        timer_->cancel(ec);
+    }
+}
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/NegativeAcksTracker.h b/pulsar-client-cpp/lib/NegativeAcksTracker.h
new file mode 100644
index 0000000..4a1bbcd
--- /dev/null
+++ b/pulsar-client-cpp/lib/NegativeAcksTracker.h
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <pulsar/MessageId.h>
+
+#include "ExecutorService.h"
+#include "ClientImpl.h"
+
+#include <mutex>
+#include <map>
+
+namespace pulsar {
+
+class NegativeAcksTracker {
+   public:
+    NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer, const ConsumerConfiguration
&conf);
+
+    NegativeAcksTracker(const NegativeAcksTracker &) = delete;
+
+    NegativeAcksTracker &operator=(const NegativeAcksTracker &) = delete;
+
+    void add(const MessageId &m);
+
+    void close();
+
+   private:
+    void scheduleTimer();
+    void handleTimer(const boost::system::error_code &ec);
+
+    static const std::chrono::milliseconds MIN_NACK_DELAY_NANOS;
+
+    ConsumerImpl &consumer_;
+    std::mutex mutex_;
+
+    std::chrono::milliseconds nackDelay_;
+    boost::posix_time::milliseconds timerInterval_;
+    typedef typename std::chrono::steady_clock Clock;
+    std::map<MessageId, Clock::time_point> nackedMessages_;
+
+    ExecutorServicePtr executor_;
+    DeadlineTimerPtr timer_;
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 9ce3703..ca4596e 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -183,6 +183,12 @@ void PartitionedConsumerImpl::acknowledgeCumulativeAsync(const MessageId&
msgId,
     callback(ResultOperationNotSupported);
 }
 
+void PartitionedConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
+    int32_t partition = msgId.partition();
+    unAckedMessageTrackerPtr_->remove(msgId);
+    consumers_[partition]->negativeAcknowledge(msgId);
+}
+
 void PartitionedConsumerImpl::start() {
     ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
     std::shared_ptr<ConsumerImpl> consumer;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index ec79529..1934a8d 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -71,6 +71,8 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
                                 size_t, BrokerConsumerStatsCallback);
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback);
 
+    virtual void negativeAcknowledge(const MessageId& msgId);
+
    private:
     const ClientImplPtr client_;
     const std::string subscriptionName_;
diff --git a/pulsar-client-cpp/lib/c/c_Consumer.cc b/pulsar-client-cpp/lib/c/c_Consumer.cc
index fed1683..6273ad1 100644
--- a/pulsar-client-cpp/lib/c/c_Consumer.cc
+++ b/pulsar-client-cpp/lib/c/c_Consumer.cc
@@ -102,6 +102,14 @@ void pulsar_consumer_acknowledge_cumulative_async_id(pulsar_consumer_t
*consumer
         messageId->messageId, std::bind(handle_result_callback, std::placeholders::_1,
callback, ctx));
 }
 
+void pulsar_consumer_negative_acknowledge(pulsar_consumer_t *consumer, pulsar_message_t *message)
{
+    consumer->consumer.negativeAcknowledge(message->message);
+}
+
+void pulsar_consumer_negative_acknowledge_id(pulsar_consumer_t *consumer, pulsar_message_id_t
*messageId) {
+    consumer->consumer.negativeAcknowledge(messageId->messageId);
+}
+
 pulsar_result pulsar_consumer_close(pulsar_consumer_t *consumer) {
     return (pulsar_result)consumer->consumer.close();
 }
diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
index 40a6a8f..49cf2be 100644
--- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc
@@ -101,6 +101,15 @@ long pulsar_consumer_get_unacked_messages_timeout_ms(
     return consumer_configuration->consumerConfiguration.getUnAckedMessagesTimeoutMs();
 }
 
+void pulsar_configure_set_negative_ack_redelivery_delay_ms(
+    pulsar_consumer_configuration_t *consumer_configuration, long redeliveryDelayMillis)
{
+    consumer_configuration->consumerConfiguration.setNegativeAckRedeliveryDelayMs(redeliveryDelayMillis);
+}
+long pulsar_configure_get_negative_ack_redelivery_delay_ms(
+    pulsar_consumer_configuration_t *consumer_configuration) {
+    return consumer_configuration->consumerConfiguration.getNegativeAckRedeliveryDelayMs();
+}
+
 int pulsar_consumer_is_encryption_enabled(pulsar_consumer_configuration_t *consumer_configuration)
{
     return consumer_configuration->consumerConfiguration.isEncryptionEnabled();
 }
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index b32ba69..52c2800 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -48,19 +48,20 @@ static int globalCount = 0;
 static long globalResendMessageCount = 0;
 static std::string lookupUrl = "pulsar://localhost:6650";
 static std::string adminUrl = "http://localhost:8080/";
-static void messageListenerFunction(Consumer consumer, const Message& msg) {
+
+static void messageListenerFunction(Consumer consumer, const Message &msg) {
     globalCount++;
     consumer.acknowledge(msg);
 }
 
-static void messageListenerFunctionWithoutAck(Consumer consumer, const Message& msg,
Latch& latch,
-                                              const std::string& content) {
+static void messageListenerFunctionWithoutAck(Consumer consumer, const Message &msg,
Latch &latch,
+                                              const std::string &content) {
     globalCount++;
     ASSERT_EQ(content, msg.getDataAsString());
     latch.countdown();
 }
 
-static void sendCallBack(Result r, const Message& msg, std::string prefix, int* count)
{
+static void sendCallBack(Result r, const Message &msg, std::string prefix, int *count)
{
     static std::mutex sendMutex_;
     sendMutex_.lock();
     ASSERT_EQ(r, ResultOk);
@@ -71,8 +72,8 @@ static void sendCallBack(Result r, const Message& msg, std::string prefix,
int*
     sendMutex_.unlock();
 }
 
-static void receiveCallBack(Result r, const Message& msg, std::string& messageContent,
bool checkContent,
-                            bool* isFailed, int* count) {
+static void receiveCallBack(Result r, const Message &msg, std::string &messageContent,
bool checkContent,
+                            bool *isFailed, int *count) {
     static std::mutex receiveMutex_;
     receiveMutex_.lock();
 
@@ -89,8 +90,8 @@ static void receiveCallBack(Result r, const Message& msg, std::string&
messageCo
     receiveMutex_.unlock();
 }
 
-static void sendCallBackWithDelay(Result r, const Message& msg, std::string prefix, double
percentage,
-                                  uint64_t delayInMicros, int* count) {
+static void sendCallBackWithDelay(Result r, const Message &msg, std::string prefix, double
percentage,
+                                  uint64_t delayInMicros, int *count) {
     if ((rand() % 100) <= percentage) {
         usleep(delayInMicros);
     }
@@ -99,7 +100,7 @@ static void sendCallBackWithDelay(Result r, const Message& msg, std::string
pref
 
 class EncKeyReader : public CryptoKeyReader {
    private:
-    void readFile(std::string fileName, std::string& fileContents) const {
+    void readFile(std::string fileName, std::string &fileContents) const {
         std::ifstream ifs(fileName);
         std::stringstream fileStream;
         fileStream << ifs.rdbuf();
@@ -110,8 +111,8 @@ class EncKeyReader : public CryptoKeyReader {
    public:
     EncKeyReader() {}
 
-    Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>&
metadata,
-                        EncryptionKeyInfo& encKeyInfo) const {
+    Result getPublicKey(const std::string &keyName, std::map<std::string, std::string>
&metadata,
+                        EncryptionKeyInfo &encKeyInfo) const {
         std::string CERT_FILE_PATH =
             "../../pulsar-broker/src/test/resources/certificate/public-key." + keyName;
         std::string keyContents;
@@ -121,8 +122,8 @@ class EncKeyReader : public CryptoKeyReader {
         return ResultOk;
     }
 
-    Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>&
metadata,
-                         EncryptionKeyInfo& encKeyInfo) const {
+    Result getPrivateKey(const std::string &keyName, std::map<std::string, std::string>
&metadata,
+                         EncryptionKeyInfo &encKeyInfo) const {
         std::string CERT_FILE_PATH =
             "../../pulsar-broker/src/test/resources/certificate/private-key." + keyName;
         std::string keyContents;
@@ -561,7 +562,7 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
     ASSERT_EQ(ResultOk, result);
 
     int size = Commands::MaxMessageSize + 1;
-    char* content = new char[size];
+    char *content = new char[size];
     Message msg = MessageBuilder().setAllocatedContent(content, size).build();
     result = producer.send(msg);
     ASSERT_EQ(ResultMessageTooBig, result);
@@ -1114,7 +1115,7 @@ TEST(BasicEndToEndTest, testProduceMessageSize) {
     ASSERT_EQ(ResultOk, result);
 
     int size = Commands::MaxMessageSize + 1;
-    char* content = new char[size];
+    char *content = new char[size];
     Message msg = MessageBuilder().setAllocatedContent(content, size).build();
     result = producer1.send(msg);
     ASSERT_EQ(ResultMessageTooBig, result);
@@ -1165,7 +1166,7 @@ TEST(BasicEndToEndTest, testBigMessageSizeBatching) {
     ASSERT_EQ(ResultOk, result);
 
     int size = Commands::MaxMessageSize + 1;
-    char* content = new char[size];
+    char *content = new char[size];
     Message msg = MessageBuilder().setAllocatedContent(content, size).build();
     result = producer1.send(msg);
     ASSERT_EQ(ResultMessageTooBig, result);
@@ -1200,7 +1201,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
         Message msg =
             MessageBuilder().setContent(messageContent).setProperty(propertyName, std::to_string(i)).build();
         if (i % 3 == 1) {
-            ProducerImpl& pImpl = PulsarFriend::getProducerImpl(producer);
+            ProducerImpl &pImpl = PulsarFriend::getProducerImpl(producer);
             ClientConnectionPtr clientConnectionPtr;
             do {
                 ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(pImpl);
@@ -1222,7 +1223,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
         receivedMsgIndex.insert(msg.getProperty(propertyName));
     }
 
-    ConsumerImpl& cImpl = PulsarFriend::getConsumerImpl(consumer);
+    ConsumerImpl &cImpl = PulsarFriend::getConsumerImpl(consumer);
     ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(cImpl);
     ClientConnectionPtr clientConnectionPtr = clientConnectionWeakPtr.lock();
     oldConnections.push_back(clientConnectionPtr);
@@ -2220,7 +2221,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
 }
 
 // for partitioned reason, it may hard to verify message id.
-static void simpleCallback(Result code, const Message& msg) {
+static void simpleCallback(Result code, const Message &msg) {
     LOG_INFO("Received code: " << code << " -- Msg: " << msg);
 }
 
@@ -2842,3 +2843,70 @@ TEST(BasicEndToEndTest, testDupConsumersOnSharedModeNotThrowsExcOnUnsubscribe)
{
     // If dup consumers are allowed BrokerMetadataError will be the result of close()
     ASSERT_EQ(ResultAlreadyClosed, consumerA.close());
 }
+
+void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
+    Client client(lookupUrl);
+    Consumer consumer;
+    ConsumerConfiguration conf;
+    conf.setNegativeAckRedeliveryDelayMs(100);
+    Result result = client.subscribe(topic, "test", consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    Producer producer;
+    ProducerConfiguration producerConf;
+    producerConf.setBatchingEnabled(batchingEnabled);
+    result = client.createProducer(topic, producerConf, producer);
+    ASSERT_EQ(ResultOk, result);
+
+    for (int i = 0; i < 10; i++) {
+        Message msg = MessageBuilder().setContent("test-" + std::to_string(i)).build();
+        producer.sendAsync(msg, nullptr);
+    }
+
+    producer.flush();
+
+    for (int i = 0; i < 10; i++) {
+        Message msg;
+        consumer.receive(msg);
+
+        ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
+        consumer.negativeAcknowledge(msg);
+    }
+
+    for (int i = 0; i < 10; i++) {
+        Message msg;
+        consumer.receive(msg);
+
+        ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
+
+        consumer.acknowledge(msg);
+    }
+
+    // No more messages expected
+    Message msg;
+    Result res = consumer.receive(msg, 100);
+    ASSERT_EQ(ResultTimeout, res);
+
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testNegativeAcks) {
+    testNegativeAcks("testNegativeAcks-" + std::to_string(time(nullptr)), false);
+}
+
+TEST(BasicEndToEndTest, testNegativeAcksWithBatching) {
+    testNegativeAcks("testNegativeAcksWithBatching-" + std::to_string(time(nullptr)), true);
+}
+
+TEST(BasicEndToEndTest, testNegativeAcksWithPartitions) {
+    std::string topicName = "testNegativeAcksWithPartitions-" + std::to_string(time(nullptr));
+
+    // call admin api to make it partitioned
+    std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
+    int res = makePutRequest(url, "3");
+
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    testNegativeAcks(topicName, true);
+}
\ No newline at end of file


Mime
View raw message