pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Added support for "negative acks" in Java client (#3703)
Date Thu, 07 Mar 2019 23:11:15 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1da2161  Added support for "negative acks" in Java client (#3703)
1da2161 is described below

commit 1da21612d48f68db39905842feeaed202f45b684
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Thu Mar 7 15:11:10 2019 -0800

    Added support for "negative acks" in Java client (#3703)
    
    * Added support for "negative acks" in Java client
    
    * Fixed redelivery delay to be >= than configured
    
    * Fixed redelivery after timeout
    
    * Fixed timeout interval calculation
    
    * Removed the 1.1 nonsense
    
    * Fixed test cleanup
    
    * Avoid failure when passing empty set of msg ids
---
 .../pulsar/client/impl/NegativeAcksTest.java       | 155 +++++++++++++++++++++
 .../org/apache/pulsar/client/api/Consumer.java     |  51 +++++++
 .../apache/pulsar/client/api/ConsumerBuilder.java  |  15 ++
 .../apache/pulsar/client/impl/ConsumerBase.java    |   5 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   8 +-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  17 ++-
 .../client/impl/MultiTopicsConsumerImpl.java       |  13 ++
 .../pulsar/client/impl/NegativeAcksTracker.java    |  93 +++++++++++++
 .../impl/conf/ConsumerConfigurationData.java       |   2 +
 .../pulsar/PulsarConsumerSourceTests.java          |   8 ++
 10 files changed, 365 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
new file mode 100644
index 0000000..857fc20
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class NegativeAcksTest extends ProducerConsumerBase {
+
+    @Override
+    @BeforeClass
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterClass
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "variations")
+    public static Object[][] variations() {
+        return new Object[][] {
+                // batching / partitions / subscription-type / redelivery-delay-ms / ack-timeout
+                { false, false, SubscriptionType.Shared, 100, 0 },
+                { false, false, SubscriptionType.Failover, 100, 0 },
+                { false, true, SubscriptionType.Shared, 100, 0 },
+                { false, true, SubscriptionType.Failover, 100, 0 },
+                { true, false, SubscriptionType.Shared, 100, 0 },
+                { true, false, SubscriptionType.Failover, 100, 0 },
+                { true, true, SubscriptionType.Shared, 100, 0 },
+                { true, true, SubscriptionType.Failover, 100, 0 },
+
+                { false, false, SubscriptionType.Shared, 0, 0 },
+                { false, false, SubscriptionType.Failover, 0, 0 },
+                { false, true, SubscriptionType.Shared, 0, 0 },
+                { false, true, SubscriptionType.Failover, 0, 0 },
+                { true, false, SubscriptionType.Shared, 0, 0 },
+                { true, false, SubscriptionType.Failover, 0, 0 },
+                { true, true, SubscriptionType.Shared, 0, 0 },
+                { true, true, SubscriptionType.Failover, 0, 0 },
+
+                { false, false, SubscriptionType.Shared, 100, 1000 },
+                { false, false, SubscriptionType.Failover, 100, 1000 },
+                { false, true, SubscriptionType.Shared, 100, 1000 },
+                { false, true, SubscriptionType.Failover, 100, 1000 },
+                { true, false, SubscriptionType.Shared, 100, 1000 },
+                { true, false, SubscriptionType.Failover, 100, 1000 },
+                { true, true, SubscriptionType.Shared, 100, 1000 },
+                { true, true, SubscriptionType.Failover, 100, 1000 },
+
+                { false, false, SubscriptionType.Shared, 0, 1000 },
+                { false, false, SubscriptionType.Failover, 0, 1000 },
+                { false, true, SubscriptionType.Shared, 0, 1000 },
+                { false, true, SubscriptionType.Failover, 0, 1000 },
+                { true, false, SubscriptionType.Shared, 0, 1000 },
+                { true, false, SubscriptionType.Failover, 0, 1000 },
+                { true, true, SubscriptionType.Shared, 0, 1000 },
+                { true, true, SubscriptionType.Failover, 0, 1000 },
+        };
+    }
+
+    @Test(dataProvider = "variations")
+    public void testNegativeAcks(boolean batching, boolean usePartitions, SubscriptionType
subscriptionType,
+            int negAcksDelayMillis, int ackTimeout)
+            throws Exception {
+        log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}",
batching, usePartitions,
+                subscriptionType, negAcksDelayMillis);
+        String topic = "testNegativeAcks-" + System.nanoTime();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub1")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS)
+                .subscriptionType(subscriptionType)
+                .negativeAckRedeliveryDelay(negAcksDelayMillis, TimeUnit.MILLISECONDS)
+                .ackTimeout(ackTimeout, TimeUnit.MILLISECONDS)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .enableBatching(batching)
+                .create();
+
+        Set<String> sentMessages = new HashSet<>();
+
+        final int N = 10;
+        for (int i = 0; i < N; i++) {
+            String value = "test-" + i;
+            producer.sendAsync(value);
+            sentMessages.add(value);
+        }
+        producer.flush();
+
+        for (int i = 0; i < N; i++) {
+            Message<String> msg = consumer.receive();
+            consumer.negativeAcknowledge(msg);
+        }
+
+        Set<String> receivedMessages = new HashSet<>();
+
+        // All the messages should be received again
+        for (int i = 0; i < N; i++) {
+            Message<String> msg = consumer.receive();
+            receivedMessages.add(msg.getValue());
+            consumer.acknowledge(msg);
+        }
+
+        assertEquals(receivedMessages, sentMessages);
+
+        // There should be no more messages
+        assertNull(consumer.receive(100, TimeUnit.MILLISECONDS));
+        consumer.close();
+        producer.close();
+    }
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
index 76c9911..deb84ab 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
@@ -130,6 +130,57 @@ public interface Consumer<T> extends Closeable {
     void acknowledge(MessageId messageId) throws PulsarClientException;
 
     /**
+     * Acknowledge the failure to process a single message.
+     * <p>
+     * When a message is "negatively acked" it will be marked for redelivery after
+     * some fixed delay. The delay is configurable when constructing the consumer
+     * with {@link ConsumerBuilder#negativeAckRedeliveryDelay(long, TimeUnit)}.
+     * <p>
+     * This call is not blocking.
+     *
+     * <p>
+     * Example of usage:
+     * <pre><code>
+     * while (true) {
+     *     Message&lt;String&gt; msg = consumer.receive();
+     *
+     *     try {
+     *          // Process message...
+     *
+     *          consumer.acknowledge(msg);
+     *     } catch (Throwable t) {
+     *          log.warn("Failed to process message");
+     *          consumer.negativeAcknowledge(msg);
+     *     }
+     * }
+     * </code></pre>
+     *
+     * @param message
+     *            The {@code Message} to be acknowledged
+     */
+    void negativeAcknowledge(Message<?> message);
+
+    /**
+     * Acknowledge the failure to process a single message.
+     * <p>
+     * When a message is "negatively acked" it will be marked for redelivery after
+     * some fixed delay. The delay is configurable when constructing the consumer
+     * with {@link ConsumerBuilder#negativeAckRedeliveryDelay(long, TimeUnit)}.
+     * <p>
+     * This call is not blocking.
+     * <p>
+     * This variation allows to pass a {@link MessageId} rather than a {@link Message}
+     * object, in order to avoid keeping the payload in memory for extended amount
+     * of time
+     *
+     * @see #negativeAcknowledge(Message)
+     *
+     * @param messageId
+     *            The {@code MessageId} to be acknowledged
+     */
+    void negativeAcknowledge(MessageId messageId);
+
+    /**
      * Acknowledge the reception of all the messages in the stream up to (and including)
the provided message.
      *
      * This method will block until the acknowledge has been sent to the broker. After that,
the messages will not be
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index 09ad5eb..8cb50f9 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -188,6 +188,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
     ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);
 
     /**
+     * Set the delay to wait before re-delivering messages that have failed to be process.
+     * <p>
+     * When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
+     * will be redelivered after a fixed timeout. The default is 1 min.
+     *
+     * @param redeliveryDelay
+     *            redelivery delay for failed messages
+     * @param timeUnit
+     *            unit in which the timeout is provided.
+     * @return the consumer builder instance
+     * @see Consumer#negativeAcknowledge(Message)
+     */
+    ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit);
+
+    /**
      * Select the subscription type to be used when subscribing to the topic.
      * <p>
      * Options are:
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index d8a0de3..1d49512 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -255,6 +255,11 @@ public abstract class ConsumerBase<T> extends HandlerState implements
Consumer<T
         return doAcknowledge(messageId, AckType.Cumulative, Collections.emptyMap());
     }
 
+    @Override
+    public void negativeAcknowledge(Message<?> message) {
+        negativeAcknowledge(message.getMessageId());
+    }
+
     abstract protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType
ackType,
                                                              Map<String,Long> properties);
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 7e809ee..6f8d9c9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -159,13 +159,19 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T>
{
 
     @Override
     public ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit) {
-        checkArgument(timeUnit.toMillis(ackTimeout) >= MIN_ACK_TIMEOUT_MILLIS,
+        checkArgument(ackTimeout == 0 || timeUnit.toMillis(ackTimeout) >= MIN_ACK_TIMEOUT_MILLIS,
                 "Ack timeout should be should be greater than " + MIN_ACK_TIMEOUT_MILLIS
+ " ms");
         conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
         return this;
     }
 
     @Override
+    public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit
timeUnit) {
+        conf.setNegativeAckRedeliveryDelayMicros(timeUnit.toMicros(redeliveryDelay));
+        return this;
+    }
+
+    @Override
     public ConsumerBuilder<T> subscriptionType(@NonNull SubscriptionType subscriptionType)
{
         conf.setSubscriptionType(subscriptionType);
         return this;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 4145cb9..7c3f0ab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -111,6 +111,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
 
     private final UnAckedMessageTracker unAckedMessageTracker;
     private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
+    private final NegativeAcksTracker negativeAcksTracker;
 
     protected final ConsumerStatsRecorder stats;
     private final int priorityLevel;
@@ -174,6 +175,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
         this.priorityLevel = conf.getPriorityLevel();
         this.readCompacted = conf.isReadCompacted();
         this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
+        this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -183,7 +185,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
 
         if (conf.getAckTimeoutMillis() != 0) {
             if (conf.getTickDurationMillis() > 0) {
-                this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(),
conf.getTickDurationMillis());
+                this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(),
+                        Math.min(conf.getTickDurationMillis(), conf.getAckTimeoutMillis()));
             } else {
                 this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
             }
@@ -454,6 +457,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
     }
 
     @Override
+    public void negativeAcknowledge(MessageId messageId) {
+        negativeAcksTracker.add(messageId);
+
+        // Ensure the message is not redelivered for ack-timeout, since we did receive an
"ack"
+        unAckedMessageTracker.remove(messageId);
+    }
+
+    @Override
     public void connectionOpened(final ClientCnx cnx) {
         setClientCnx(cnx);
         cnx.registerConsumer(consumerId, this);
@@ -1213,6 +1224,10 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandle
 
     @Override
     public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        if (messageIds.isEmpty()) {
+            return;
+        }
+
         checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl);
 
         if (conf.getSubscriptionType() != SubscriptionType.Shared) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 80532e1..71af9c7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -406,6 +406,15 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
     }
 
     @Override
+    public void negativeAcknowledge(MessageId messageId) {
+        checkArgument(messageId instanceof TopicMessageIdImpl);
+        TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
+
+        ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
+        consumer.negativeAcknowledge(topicMessageId.getInnerMessageId());
+    }
+
+    @Override
     public CompletableFuture<Void> unsubscribeAsync() {
         if (getState() == State.Closing || getState() == State.Closed) {
             return FutureUtil.failedFuture(
@@ -526,6 +535,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T>
{
 
     @Override
     public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        if (messageIds.isEmpty()) {
+            return;
+        }
+
         checkArgument(messageIds.stream().findFirst().get() instanceof TopicMessageIdImpl);
 
         if (conf.getSubscriptionType() != SubscriptionType.Shared) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
new file mode 100644
index 0000000..304546e
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/NegativeAcksTracker.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.util.Timeout;
+import io.netty.util.Timer;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+
+class NegativeAcksTracker {
+
+    private HashMap<MessageId, Long> nackedMessages = null;
+
+    private final ConsumerBase<?> consumer;
+    private final Timer timer;
+    private final long nackDelayNanos;
+    private final long timerIntervalNanos;
+
+    private Timeout timeout;
+
+    // Set a min delay to allow for grouping nacks within a single batch
+    private static final long MIN_NACK_DELAY_NANOS = TimeUnit.MILLISECONDS.toNanos(100);
+
+    public NegativeAcksTracker(ConsumerBase<?> consumer, ConsumerConfigurationData<?>
conf) {
+        this.consumer = consumer;
+        this.timer = ((PulsarClientImpl) consumer.getClient()).timer();
+        this.nackDelayNanos = Math.max(TimeUnit.MICROSECONDS.toNanos(conf.getNegativeAckRedeliveryDelayMicros()),
+                MIN_NACK_DELAY_NANOS);
+        this.timerIntervalNanos = nackDelayNanos / 3;
+    }
+
+    private synchronized void triggerRedelivery(Timeout t) {
+        if (nackedMessages.isEmpty()) {
+            this.timeout = null;
+            return;
+        }
+
+        // Group all the nacked messages into one single re-delivery request
+        Set<MessageId> messagesToRedeliver = new HashSet<>();
+        long now = System.nanoTime();
+        nackedMessages.forEach((msgId, timestamp) -> {
+            if (timestamp < now) {
+                messagesToRedeliver.add(msgId);
+            }
+        });
+
+        messagesToRedeliver.forEach(nackedMessages::remove);
+        consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
+
+        this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
+    }
+
+    public synchronized void add(MessageId messageId) {
+        if (messageId instanceof BatchMessageIdImpl) {
+            BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
+            messageId = new MessageIdImpl(batchMessageId.getLedgerId(), batchMessageId.getEntryId(),
+                    batchMessageId.getPartitionIndex());
+        }
+
+        if (nackedMessages == null) {
+            nackedMessages = new HashMap<>();
+        }
+        nackedMessages.put(messageId, System.nanoTime() + nackDelayNanos);
+
+        if (this.timeout == null) {
+            // Schedule a task and group all the redeliveries for same period. Leave a small
buffer to allow for
+            // nack immedietaly following the current one will be batched into the same redelivery
request.
+            this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos,
TimeUnit.NANOSECONDS);
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 0f61cbb..aab5234 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -63,6 +63,8 @@ public class ConsumerConfigurationData<T> implements Serializable,
Cloneable {
 
     private long acknowledgementsGroupTimeMicros = TimeUnit.MILLISECONDS.toMicros(100);
 
+    private long negativeAckRedeliveryDelayMicros = TimeUnit.MINUTES.toMicros(1);
+
     private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
 
     private String consumerName = null;
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index 89b842f..b96f971 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -429,6 +429,14 @@ public class PulsarConsumerSourceTests {
         }
 
         @Override
+        public void negativeAcknowledge(Message<?> message) {
+        }
+
+        @Override
+        public void negativeAcknowledge(MessageId messageId) {
+        }
+
+        @Override
         public void acknowledgeCumulative(Message<?> message) throws PulsarClientException
{
 
         }


Mime
View raw message