From commits-return-19894-archive-asf-public=cust-asf.ponee.io@pulsar.apache.org Thu Jan 3 03:19:38 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 06991180662 for ; Thu, 3 Jan 2019 03:19:37 +0100 (CET) Received: (qmail 64225 invoked by uid 500); 3 Jan 2019 02:19:37 -0000 Mailing-List: contact commits-help@pulsar.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.apache.org Delivered-To: mailing list commits@pulsar.apache.org Received: (qmail 64216 invoked by uid 99); 3 Jan 2019 02:19:37 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 03 Jan 2019 02:19:37 +0000 From: GitBox To: commits@pulsar.apache.org Subject: [GitHub] jiazhai commented on issue #3287: ConsumerImpl.redeliverUnacknowledgedMessages() get empty messageids Message-ID: <154648197647.25256.17001726703751834656.gitbox@gitbox.apache.org> Date: Thu, 03 Jan 2019 02:19:36 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit jiazhai commented on issue #3287: ConsumerImpl.redeliverUnacknowledgedMessages() get empty messageids URL: https://github.com/apache/pulsar/issues/3287#issuecomment-451041525 In 2.2.1 the redeliver is triggered as this code: ``` public void start(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis) { this.stop(); timeout = client.timer().newTimeout(new TimerTask() { @Override public void run(Timeout t) throws Exception { if (isAckTimeout()) { log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size()); Set messageIds = new HashSet<>(); oldOpenSet.forEach(messageIds::add); oldOpenSet.clear(); consumerBase.redeliverUnacknowledgedMessages(messageIds); < === } toggle(); timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS); } }, ackTimeoutMillis, TimeUnit.MILLISECONDS); } ``` As the code, messageIds could be empty. and caused the issue. While in latest 2.3.0, @codelipenghui added some change in PR #3118, and there is a check of condition `if (messageIds.size() > 0)` now. ``` public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis, long tickDurationInMs) { Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis >= tickDurationInMs); this.ackTimeoutMillis = ackTimeoutMillis; this.tickDurationInMs = tickDurationInMs; ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); this.messageIdPartitionMap = new ConcurrentHashMap<>(); this.timePartitions = new LinkedList<>(); int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs); for (int i = 0; i < blankPartitions + 1; i++) { timePartitions.add(new ConcurrentOpenHashSet<>()); } timeout = client.timer().newTimeout(new TimerTask() { @Override public void run(Timeout t) throws Exception { Set messageIds = new HashSet<>(); writeLock.lock(); try { timePartitions.addLast(new ConcurrentOpenHashSet<>()); ConcurrentOpenHashSet headPartition = timePartitions.removeFirst(); if (!headPartition.isEmpty()) { log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size()); headPartition.forEach(messageId -> { messageIds.add(messageId); messageIdPartitionMap.remove(messageId); }); } } finally { writeLock.unlock(); } if (messageIds.size() > 0) { < === consumerBase.redeliverUnacknowledgedMessages(messageIds); } timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS); } }, this.tickDurationInMs, TimeUnit.MILLISECONDS); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services