pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #963: Partitioned Consumer UnAckedMessageTracker
Date Thu, 01 Jan 1970 00:00:00 GMT
merlimat closed pull request #963: Partitioned Consumer UnAckedMessageTracker
URL: https://github.com/apache/incubator-pulsar/pull/963
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 444f86a85..e9cecbb1a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -348,7 +348,8 @@ public void testFailoverAckedNormalTopic() throws Exception {
     }
 
     private static long getUnackedMessagesCountInPartitionedConsumer(Consumer c) {
-        return ((PartitionedConsumerImpl) c).getConsumers().stream()
+        PartitionedConsumerImpl pc = (PartitionedConsumerImpl) c;
+        return pc.getUnAckedMessageTracker().size() + pc.getConsumers().stream()
                 .mapToLong(consumer -> consumer.getUnAckedMessageTracker().size()).sum();
     }
 
@@ -417,6 +418,7 @@ public void testSharedAckedPartitionedTopic() throws Exception {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
+        ((PartitionedConsumerImpl) consumer).getUnAckedMessageTracker().toggle();
         ((PartitionedConsumerImpl) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());
 
         // 8. producer publish more messages
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 19f922a8a..80d8296d6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -769,8 +769,8 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload,
ClientC
             uncompressedPayload.release();
             msgMetadata.recycle();
 
+            lock.readLock().lock();
             try {
-                lock.readLock().lock();
                 // Enqueue the message so that it can be retrieved when application calls
receive()
                 // if the conf.getReceiverQueueSize() is 0 then discard message if no one
is waiting for it.
                 // if asyncReceive is waiting then notify callback without adding to incomingMessages
queue
@@ -916,12 +916,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata,
ByteBuf unc
                 final MessageImpl message = new MessageImpl(batchMessageIdImpl, msgMetadata,
                         singleMessageMetadataBuilder.build(), singleMessagePayload, cnx);
                 lock.readLock().lock();
-                if (pendingReceives.isEmpty()) {
-                    incomingMessages.add(message);
-                } else {
-                    notifyPendingReceivedCallback(message, null);
+                try {
+                    if (pendingReceives.isEmpty()) {
+                        incomingMessages.add(message);
+                    } else {
+                        notifyPendingReceivedCallback(message, null);
+                    }
+                } finally {
+                    lock.readLock().unlock();
                 }
-                lock.readLock().unlock();
                 singleMessagePayload.release();
                 singleMessageMetadataBuilder.recycle();
             }
@@ -965,7 +968,12 @@ protected synchronized void messageProcessed(Message msg) {
             if (id instanceof BatchMessageIdImpl) {
                 id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
             }
-            unAckedMessageTracker.add(id);
+            if (partitionIndex != -1) {
+                // we should no longer track this message, PartitionedConsumerImpl will take
care from now onwards
+                unAckedMessageTracker.remove(id);
+            } else {
+                unAckedMessageTracker.add(id);
+            }
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 3a1986380..bc80b9089 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -20,9 +20,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
@@ -32,12 +30,15 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.naming.DestinationName;
@@ -61,16 +62,23 @@
     private final int numPartitions;
     private final ReadWriteLock lock = new ReentrantReadWriteLock();
     private final ConsumerStats stats;
+    private final UnAckedMessageTracker unAckedMessageTracker;
 
     PartitionedConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration
conf,
             int numPartitions, ExecutorService listenerExecutor, CompletableFuture<Consumer>
subscribeFuture) {
-        super(client, topic, subscription, conf, Math.max(numPartitions, conf.getReceiverQueueSize()),
listenerExecutor,
+        super(client, topic, subscription, conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()),
listenerExecutor,
                 subscribeFuture);
         this.consumers = Lists.newArrayListWithCapacity(numPartitions);
         this.pausedConsumers = new ConcurrentLinkedQueue<>();
         this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
         this.numPartitions = numPartitions;
 
+        if (conf.getAckTimeoutMillis() != 0) {
+            this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
+        } else {
+            this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
+        }
+
         stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats()
: null;
         checkArgument(conf.getReceiverQueueSize() > 0,
                 "Receiver queue size needs to be greater than 0 for Partitioned Topics");
@@ -130,31 +138,46 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) {
             // Process the message, add to the queue and trigger listener or async callback
             messageReceived(message);
 
-            if (incomingMessages.size() >= maxReceiverQueueSize
-                    || (incomingMessages.size() > sharedQueueResumeThreshold &&
!pausedConsumers.isEmpty())) {
-                // mark this consumer to be resumed later: if No more space left in shared
queue,
-                // or if any consumer is already paused (to create fair chance for already
paused consumers)
-                pausedConsumers.add(consumer);
-            } else {
-                // Schedule next receiveAsync() if the incoming queue is not full. Use a
different thread to avoid
-                // recursion and stack overflow
-                client.eventLoopGroup().execute(() -> {
-                    receiveMessageFromConsumer(consumer);
-                });
+            // we're modifying pausedConsumers
+            lock.writeLock().lock();
+            try {
+                int size = incomingMessages.size();
+                if (size >= maxReceiverQueueSize
+                        || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty()))
{
+                    // mark this consumer to be resumed later: if No more space left in shared
queue,
+                    // or if any consumer is already paused (to create fair chance for already
paused consumers)
+                    pausedConsumers.add(consumer);
+                } else {
+                    // Schedule next receiveAsync() if the incoming queue is not full. Use
a different thread to avoid
+                    // recursion and stack overflow
+                    client.eventLoopGroup().execute(() -> {
+                        receiveMessageFromConsumer(consumer);
+                    });
+                }
+            } finally {
+                lock.writeLock().unlock();
             }
         });
     }
 
     private void resumeReceivingFromPausedConsumersIfNeeded() {
-        if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty())
{
-            while (true) {
-                ConsumerImpl consumer = pausedConsumers.poll();
-                if (consumer == null) {
-                    break;
-                }
+        lock.readLock().lock();
+        try {
+            if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty())
{
+                while (true) {
+                    ConsumerImpl consumer = pausedConsumers.poll();
+                    if (consumer == null) {
+                        break;
+                    }
 
-                receiveMessageFromConsumer(consumer);
+                    // if messages are readily available on consumer we will attempt to writeLock
on the same thread
+                    client.eventLoopGroup().execute(() -> {
+                        receiveMessageFromConsumer(consumer);
+                    });
+                }
             }
+        } finally {
+            lock.readLock().unlock();
         }
     }
 
@@ -163,6 +186,7 @@ protected Message internalReceive() throws PulsarClientException {
         Message message;
         try {
             message = incomingMessages.take();
+            unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
             resumeReceivingFromPausedConsumersIfNeeded();
             return message;
         } catch (InterruptedException e) {
@@ -176,6 +200,9 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
         Message message;
         try {
             message = incomingMessages.poll(timeout, unit);
+            if (message != null) {
+                unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
+            }
             resumeReceivingFromPausedConsumersIfNeeded();
             return message;
         } catch (InterruptedException e) {
@@ -186,8 +213,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
 
     @Override
     protected CompletableFuture<Message> internalReceiveAsync() {
-
-        CompletableFuture<Message> result = new CompletableFuture<Message>();
+        CompletableFuture<Message> result = new CompletableFuture<>();
         Message message;
         try {
             lock.writeLock().lock();
@@ -195,6 +221,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
             if (message == null) {
                 pendingReceives.add(result);
             } else {
+                unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
                 resumeReceivingFromPausedConsumersIfNeeded();
                 result.complete(message);
             }
@@ -222,7 +249,8 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
         } else {
 
             ConsumerImpl consumer = consumers.get(((MessageIdImpl) messageId).getPartitionIndex());
-            return consumer.doAcknowledge(messageId, ackType);
+            return consumer.doAcknowledge(messageId, ackType).thenRun(() ->
+                    unAckedMessageTracker.remove((MessageIdImpl) messageId));
         }
 
     }
@@ -247,6 +275,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
                     if (completed.decrementAndGet() == 0) {
                         if (unsubscribeFail.get() == null) {
                             setState(State.Closed);
+                            unAckedMessageTracker.close();
                             unsubscribeFuture.complete(null);
                             log.info("[{}] [{}] Unsubscribed Partitioned Consumer", topic,
subscription);
                         } else {
@@ -268,8 +297,8 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-
         if (getState() == State.Closing || getState() == State.Closed) {
+            unAckedMessageTracker.close();
             return CompletableFuture.completedFuture(null);
         }
         setState(State.Closing);
@@ -286,6 +315,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien
                     if (completed.decrementAndGet() == 0) {
                         if (closeFail.get() == null) {
                             setState(State.Closed);
+                            unAckedMessageTracker.close();
                             closeFuture.complete(null);
                             log.info("[{}] [{}] Closed Partitioned Consumer", topic, subscription);
                             client.cleanupConsumer(this);
@@ -350,8 +380,9 @@ void connectionOpened(ClientCnx cnx) {
     }
 
     void messageReceived(Message message) {
-        lock.readLock().lock();
+        lock.writeLock().lock();
         try {
+            unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Received message from partitioned-consumer {}", topic,
subscription, message.getMessageId());
             }
@@ -362,12 +393,13 @@ void messageReceived(Message message) {
             } else {
                 // Enqueue the message so that it can be retrieved when application calls
receive()
                 // Waits for the queue to have space for the message
+                // This should never block cause PartitonedConsumerImpl should always use
GrowableArrayBlockingQueue
                 incomingMessages.put(message);
             }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
         } finally {
-            lock.readLock().unlock();
+            lock.writeLock().unlock();
         }
 
         if (listener != null) {
@@ -418,24 +450,29 @@ private ConsumerConfiguration getInternalConsumerConfig() {
 
     @Override
     public void redeliverUnacknowledgedMessages() {
-        for (ConsumerImpl c : consumers) {
-            c.redeliverUnacknowledgedMessages();
+        synchronized (this) {
+            for (ConsumerImpl c : consumers) {
+                c.redeliverUnacknowledgedMessages();
+            }
+            incomingMessages.clear();
+            unAckedMessageTracker.clear();
+            resumeReceivingFromPausedConsumersIfNeeded();
         }
     }
 
     @Override
     public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
-        for (ConsumerImpl c : consumers) {
-            Set<MessageIdImpl> consumerMessageIds = new HashSet<>();
-            messageIds.removeIf(messageId -> {
-                if (messageId.getPartitionIndex() == c.getPartitionIndex()) {
-                    consumerMessageIds.add(messageId);
-                    return true;
-                }
-                return false;
-            });
-            c.redeliverUnacknowledgedMessages(consumerMessageIds);
+        if (conf.getSubscriptionType() != SubscriptionType.Shared) {
+            // We cannot redeliver single messages if subscription type is not Shared
+            redeliverUnacknowledgedMessages();
+            return;
         }
+        removeExpiredMessagesFromQueue(messageIds);
+        messageIds.stream()
+                .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, Collectors.toSet()))
+                .forEach((partitionIndex, messageIds1) ->
+                        consumers.get(partitionIndex).redeliverUnacknowledgedMessages(messageIds1));
+        resumeReceivingFromPausedConsumersIfNeeded();
     }
 
     @Override
@@ -498,5 +535,30 @@ public synchronized ConsumerStats getStats() {
         return stats;
     }
 
+    public UnAckedMessageTracker getUnAckedMessageTracker() {
+        return unAckedMessageTracker;
+    }
+
+    private void removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) {
+        Message peek = incomingMessages.peek();
+        if (peek != null) {
+            if (!messageIds.contains((MessageIdImpl) peek.getMessageId())) {
+                // first message is not expired, then no message is expired in queue.
+                return;
+            }
+
+            // try not to remove elements that are added while we remove
+            Message message = incomingMessages.poll();
+            while (message != null) {
+                MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();
+                if (!messageIds.contains(messageId)) {
+                    messageIds.add(messageId);
+                    break;
+                }
+                message = incomingMessages.poll();
+            }
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PartitionedConsumerImpl.class);
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message