pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] lovelle commented on a change in pull request #942: Bugfix: duplicate messages for PartitionedConsumers.
Date Thu, 01 Jan 1970 00:00:00 GMT
lovelle commented on a change in pull request #942: Bugfix: duplicate messages for PartitionedConsumers.
URL: https://github.com/apache/incubator-pulsar/pull/942#discussion_r155426435
 
 

 ##########
 File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
 ##########
 @@ -191,7 +200,9 @@ protected Message internalReceive(int timeout, TimeUnit unit) throws
PulsarClien
         Message message;
         try {
             lock.writeLock().lock();
-            message = incomingMessages.poll(0, TimeUnit.SECONDS);
+            message = (incomingMessages.size() > 0) ?
+                    incomingMessages.poll(0, TimeUnit.SECONDS) : messageFromConsumerImpl();
 
 Review comment:
   Great! You are right, but isn't `internalReceiveAsync` being called for `receiveAsync`?
In such case, if `incomingMessages` is empty `resumeReceivingFromPausedConsumersIfNeeded`
won't be called and there could be some messages in consumers. 
   
   I think just changing the if condition for something like these will be ok
   ```java
   message = incomingMessages.poll(0, TimeUnit.SECONDS);
   
   if (message == null && numMessagesInQueue() == 0) {
       pendingReceives.add(result);
   } else {
       resumeReceivingFromPausedConsumersIfNeeded();
       result.complete(message);
   }
   ```
   
   what do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message