Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1C23A200D63 for ; Fri, 22 Dec 2017 00:00:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1706D160C12; Thu, 21 Dec 2017 23:00:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 185A7160C2C for ; Fri, 22 Dec 2017 00:00:07 +0100 (CET) Received: (qmail 92116 invoked by uid 500); 21 Dec 2017 23:00:07 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 92107 invoked by uid 99); 21 Dec 2017 23:00:07 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 Dec 2017 23:00:07 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] merlimat closed pull request #963: Partitioned Consumer UnAckedMessageTracker Message-ID: <151389720673.19849.6811655236927769334.gitbox@gitbox.apache.org> archived-at: Thu, 21 Dec 2017 23:00:09 -0000 merlimat closed pull request #963: Partitioned Consumer UnAckedMessageTracker URL: https://github.com/apache/incubator-pulsar/pull/963 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java index 444f86a85..e9cecbb1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java @@ -348,7 +348,8 @@ public void testFailoverAckedNormalTopic() throws Exception { } private static long getUnackedMessagesCountInPartitionedConsumer(Consumer c) { - return ((PartitionedConsumerImpl) c).getConsumers().stream() + PartitionedConsumerImpl pc = (PartitionedConsumerImpl) c; + return pc.getUnAckedMessageTracker().size() + pc.getConsumers().stream() .mapToLong(consumer -> consumer.getUnAckedMessageTracker().size()).sum(); } @@ -417,6 +418,7 @@ public void testSharedAckedPartitionedTopic() throws Exception { assertEquals(received, 5); // 7. Simulate ackTimeout + ((PartitionedConsumerImpl) consumer).getUnAckedMessageTracker().toggle(); ((PartitionedConsumerImpl) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle()); // 8. producer publish more messages diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 19f922a8a..80d8296d6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -769,8 +769,8 @@ void messageReceived(MessageIdData messageId, ByteBuf headersAndPayload, ClientC uncompressedPayload.release(); msgMetadata.recycle(); + lock.readLock().lock(); try { - lock.readLock().lock(); // Enqueue the message so that it can be retrieved when application calls receive() // if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it. // if asyncReceive is waiting then notify callback without adding to incomingMessages queue @@ -916,12 +916,15 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc final MessageImpl message = new MessageImpl(batchMessageIdImpl, msgMetadata, singleMessageMetadataBuilder.build(), singleMessagePayload, cnx); lock.readLock().lock(); - if (pendingReceives.isEmpty()) { - incomingMessages.add(message); - } else { - notifyPendingReceivedCallback(message, null); + try { + if (pendingReceives.isEmpty()) { + incomingMessages.add(message); + } else { + notifyPendingReceivedCallback(message, null); + } + } finally { + lock.readLock().unlock(); } - lock.readLock().unlock(); singleMessagePayload.release(); singleMessageMetadataBuilder.recycle(); } @@ -965,7 +968,12 @@ protected synchronized void messageProcessed(Message msg) { if (id instanceof BatchMessageIdImpl) { id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex()); } - unAckedMessageTracker.add(id); + if (partitionIndex != -1) { + // we should no longer track this message, PartitionedConsumerImpl will take care from now onwards + unAckedMessageTracker.remove(id); + } else { + unAckedMessageTracker.add(id); + } } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java index 3a1986380..bc80b9089 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java @@ -20,9 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; @@ -32,12 +30,15 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import com.google.common.collect.ImmutableMap; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.util.FutureUtil; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.naming.DestinationName; @@ -61,16 +62,23 @@ private final int numPartitions; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private final ConsumerStats stats; + private final UnAckedMessageTracker unAckedMessageTracker; PartitionedConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, int numPartitions, ExecutorService listenerExecutor, CompletableFuture subscribeFuture) { - super(client, topic, subscription, conf, Math.max(numPartitions, conf.getReceiverQueueSize()), listenerExecutor, + super(client, topic, subscription, conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture); this.consumers = Lists.newArrayListWithCapacity(numPartitions); this.pausedConsumers = new ConcurrentLinkedQueue<>(); this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2; this.numPartitions = numPartitions; + if (conf.getAckTimeoutMillis() != 0) { + this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis()); + } else { + this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED; + } + stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null; checkArgument(conf.getReceiverQueueSize() > 0, "Receiver queue size needs to be greater than 0 for Partitioned Topics"); @@ -130,31 +138,46 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { // Process the message, add to the queue and trigger listener or async callback messageReceived(message); - if (incomingMessages.size() >= maxReceiverQueueSize - || (incomingMessages.size() > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) { - // mark this consumer to be resumed later: if No more space left in shared queue, - // or if any consumer is already paused (to create fair chance for already paused consumers) - pausedConsumers.add(consumer); - } else { - // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid - // recursion and stack overflow - client.eventLoopGroup().execute(() -> { - receiveMessageFromConsumer(consumer); - }); + // we're modifying pausedConsumers + lock.writeLock().lock(); + try { + int size = incomingMessages.size(); + if (size >= maxReceiverQueueSize + || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) { + // mark this consumer to be resumed later: if No more space left in shared queue, + // or if any consumer is already paused (to create fair chance for already paused consumers) + pausedConsumers.add(consumer); + } else { + // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid + // recursion and stack overflow + client.eventLoopGroup().execute(() -> { + receiveMessageFromConsumer(consumer); + }); + } + } finally { + lock.writeLock().unlock(); } }); } private void resumeReceivingFromPausedConsumersIfNeeded() { - if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { - while (true) { - ConsumerImpl consumer = pausedConsumers.poll(); - if (consumer == null) { - break; - } + lock.readLock().lock(); + try { + if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { + while (true) { + ConsumerImpl consumer = pausedConsumers.poll(); + if (consumer == null) { + break; + } - receiveMessageFromConsumer(consumer); + // if messages are readily available on consumer we will attempt to writeLock on the same thread + client.eventLoopGroup().execute(() -> { + receiveMessageFromConsumer(consumer); + }); + } } + } finally { + lock.readLock().unlock(); } } @@ -163,6 +186,7 @@ protected Message internalReceive() throws PulsarClientException { Message message; try { message = incomingMessages.take(); + unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); return message; } catch (InterruptedException e) { @@ -176,6 +200,9 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien Message message; try { message = incomingMessages.poll(timeout, unit); + if (message != null) { + unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); + } resumeReceivingFromPausedConsumersIfNeeded(); return message; } catch (InterruptedException e) { @@ -186,8 +213,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien @Override protected CompletableFuture internalReceiveAsync() { - - CompletableFuture result = new CompletableFuture(); + CompletableFuture result = new CompletableFuture<>(); Message message; try { lock.writeLock().lock(); @@ -195,6 +221,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien if (message == null) { pendingReceives.add(result); } else { + unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); resumeReceivingFromPausedConsumersIfNeeded(); result.complete(message); } @@ -222,7 +249,8 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien } else { ConsumerImpl consumer = consumers.get(((MessageIdImpl) messageId).getPartitionIndex()); - return consumer.doAcknowledge(messageId, ackType); + return consumer.doAcknowledge(messageId, ackType).thenRun(() -> + unAckedMessageTracker.remove((MessageIdImpl) messageId)); } } @@ -247,6 +275,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien if (completed.decrementAndGet() == 0) { if (unsubscribeFail.get() == null) { setState(State.Closed); + unAckedMessageTracker.close(); unsubscribeFuture.complete(null); log.info("[{}] [{}] Unsubscribed Partitioned Consumer", topic, subscription); } else { @@ -268,8 +297,8 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien @Override public CompletableFuture closeAsync() { - if (getState() == State.Closing || getState() == State.Closed) { + unAckedMessageTracker.close(); return CompletableFuture.completedFuture(null); } setState(State.Closing); @@ -286,6 +315,7 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClien if (completed.decrementAndGet() == 0) { if (closeFail.get() == null) { setState(State.Closed); + unAckedMessageTracker.close(); closeFuture.complete(null); log.info("[{}] [{}] Closed Partitioned Consumer", topic, subscription); client.cleanupConsumer(this); @@ -350,8 +380,9 @@ void connectionOpened(ClientCnx cnx) { } void messageReceived(Message message) { - lock.readLock().lock(); + lock.writeLock().lock(); try { + unAckedMessageTracker.add((MessageIdImpl) message.getMessageId()); if (log.isDebugEnabled()) { log.debug("[{}][{}] Received message from partitioned-consumer {}", topic, subscription, message.getMessageId()); } @@ -362,12 +393,13 @@ void messageReceived(Message message) { } else { // Enqueue the message so that it can be retrieved when application calls receive() // Waits for the queue to have space for the message + // This should never block cause PartitonedConsumerImpl should always use GrowableArrayBlockingQueue incomingMessages.put(message); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { - lock.readLock().unlock(); + lock.writeLock().unlock(); } if (listener != null) { @@ -418,24 +450,29 @@ private ConsumerConfiguration getInternalConsumerConfig() { @Override public void redeliverUnacknowledgedMessages() { - for (ConsumerImpl c : consumers) { - c.redeliverUnacknowledgedMessages(); + synchronized (this) { + for (ConsumerImpl c : consumers) { + c.redeliverUnacknowledgedMessages(); + } + incomingMessages.clear(); + unAckedMessageTracker.clear(); + resumeReceivingFromPausedConsumersIfNeeded(); } } @Override public void redeliverUnacknowledgedMessages(Set messageIds) { - for (ConsumerImpl c : consumers) { - Set consumerMessageIds = new HashSet<>(); - messageIds.removeIf(messageId -> { - if (messageId.getPartitionIndex() == c.getPartitionIndex()) { - consumerMessageIds.add(messageId); - return true; - } - return false; - }); - c.redeliverUnacknowledgedMessages(consumerMessageIds); + if (conf.getSubscriptionType() != SubscriptionType.Shared) { + // We cannot redeliver single messages if subscription type is not Shared + redeliverUnacknowledgedMessages(); + return; } + removeExpiredMessagesFromQueue(messageIds); + messageIds.stream() + .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, Collectors.toSet())) + .forEach((partitionIndex, messageIds1) -> + consumers.get(partitionIndex).redeliverUnacknowledgedMessages(messageIds1)); + resumeReceivingFromPausedConsumersIfNeeded(); } @Override @@ -498,5 +535,30 @@ public synchronized ConsumerStats getStats() { return stats; } + public UnAckedMessageTracker getUnAckedMessageTracker() { + return unAckedMessageTracker; + } + + private void removeExpiredMessagesFromQueue(Set messageIds) { + Message peek = incomingMessages.peek(); + if (peek != null) { + if (!messageIds.contains((MessageIdImpl) peek.getMessageId())) { + // first message is not expired, then no message is expired in queue. + return; + } + + // try not to remove elements that are added while we remove + Message message = incomingMessages.poll(); + while (message != null) { + MessageIdImpl messageId = (MessageIdImpl) message.getMessageId(); + if (!messageIds.contains(messageId)) { + messageIds.add(messageId); + break; + } + message = incomingMessages.poll(); + } + } + } + private static final Logger log = LoggerFactory.getLogger(PartitionedConsumerImpl.class); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services