pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jiazhai commented on issue #3287: ConsumerImpl.redeliverUnacknowledgedMessages() get empty messageids
Date Thu, 03 Jan 2019 02:19:36 GMT
jiazhai commented on issue #3287: ConsumerImpl.redeliverUnacknowledgedMessages() get empty
messageids
URL: https://github.com/apache/pulsar/issues/3287#issuecomment-451041525
 
 
   In 2.2.1 the redeliver is triggered as this code:
   ```
       public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase, long
ackTimeoutMillis) {
           this.stop();
           timeout = client.timer().newTimeout(new TimerTask() {
               @Override
               public void run(Timeout t) throws Exception {
                   if (isAckTimeout()) {
                       log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size());
                       Set<MessageId> messageIds = new HashSet<>();
                       oldOpenSet.forEach(messageIds::add);
                       oldOpenSet.clear();
                       consumerBase.redeliverUnacknowledgedMessages(messageIds);   < ===
                   }
                   toggle();
                   timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS);
               }
           }, ackTimeoutMillis, TimeUnit.MILLISECONDS);
       }
   
   ```
   As the code, messageIds could be empty. and caused the issue.
   
   While in latest 2.3.0, @codelipenghui added some change in PR #3118, and there is a check
of condition `if (messageIds.size() > 0)` now.
   
   ```
      public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase,
long ackTimeoutMillis, long tickDurationInMs) {
           Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis
>= tickDurationInMs);
           this.ackTimeoutMillis = ackTimeoutMillis;
           this.tickDurationInMs = tickDurationInMs;
           ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
           this.readLock = readWriteLock.readLock();
           this.writeLock = readWriteLock.writeLock();
           this.messageIdPartitionMap = new ConcurrentHashMap<>();
           this.timePartitions = new LinkedList<>();
   
           int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs);
           for (int i = 0; i < blankPartitions + 1; i++) {
               timePartitions.add(new ConcurrentOpenHashSet<>());
           }
   
           timeout = client.timer().newTimeout(new TimerTask() {
               @Override
               public void run(Timeout t) throws Exception {
                   Set<MessageId> messageIds = new HashSet<>();
                   writeLock.lock();
                   try {
                       timePartitions.addLast(new ConcurrentOpenHashSet<>());
                       ConcurrentOpenHashSet<MessageId> headPartition = timePartitions.removeFirst();
                       if (!headPartition.isEmpty()) {
                           log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size());
                           headPartition.forEach(messageId -> {
                               messageIds.add(messageId);
                               messageIdPartitionMap.remove(messageId);
                           });
                       }
                   } finally {
                       writeLock.unlock();
                   }
                   if (messageIds.size() > 0) {   < ===
                       consumerBase.redeliverUnacknowledgedMessages(messageIds);
                   }
                   timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
               }
           }, this.tickDurationInMs, TimeUnit.MILLISECONDS);
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message