pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guangn...@apache.org
Subject [pulsar] 05/47: Fix negative un-ack messages in consumer stats (#5929)
Date Sun, 23 Feb 2020 06:17:45 GMT
This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3aa47bc8668c270784174115391a5f8fc913fcc6
Author: lipenghui <penghui@apache.org>
AuthorDate: Mon Jan 6 12:01:08 2020 +0800

    Fix negative un-ack messages in consumer stats (#5929)
    
    Fixes #5755
    
    ### Motivation
    
    Fix negative un-ack messages in consumer stats while set maxUnackedMessagesPerConsumer=0
    
    ### Verifying this change
    
    Added unit test
---
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +-
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 106 +++++++++++++++++++++
 2 files changed, 107 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index c447809..731ae09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -278,7 +278,7 @@ public class Consumer {
     }
 
     private void incrementUnackedMessages(int ackedMessages) {
-        if (shouldBlockConsumerOnUnackMsgs() && addAndGetUnAckedMsgs(this, ackedMessages)
>= maxUnackedMessages) {
+        if (addAndGetUnAckedMsgs(this, ackedMessages) >= maxUnackedMessages &&
shouldBlockConsumerOnUnackMsgs()) {
             blockedConsumerOnUnackedMsgs = true;
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
new file mode 100644
index 0000000..4e0f012
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class ConsumerStatsTest extends ProducerConsumerBase {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setMaxUnackedMessagesPerConsumer(0);
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer() throws PulsarClientException,
InterruptedException, PulsarAdminException {
+        Assert.assertEquals(pulsar.getConfiguration().getMaxUnackedMessagesPerConsumer(),
0);
+        final String topicName = "persistent://my-property/my-ns/testConsumerStatsOnZeroMaxUnackedMessagesPerConsumer";
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(1, TimeUnit.SECONDS)
+                .subscriptionName("sub")
+                .subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .create();
+
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer.send(("message-" + i).getBytes());
+        }
+
+        int received = 0;
+        for (int i = 0; i < messages; i++) {
+            // don't ack messages here
+            consumer.receive();
+            received++;
+        }
+
+        Assert.assertEquals(received, messages);
+        received = 0;
+
+        TopicStats stats = admin.topics().getStats(topicName);
+        Assert.assertEquals(stats.subscriptions.size(), 1);
+        Assert.assertEquals(stats.subscriptions.entrySet().iterator().next().getValue().consumers.size(),
1);
+        Assert.assertFalse(stats.subscriptions.entrySet().iterator().next().getValue().consumers.get(0).blockedConsumerOnUnackedMsgs);
+        Assert.assertEquals(stats.subscriptions.entrySet().iterator().next().getValue().consumers.get(0).unackedMessages,
messages);
+
+        for (int i = 0; i < messages; i++) {
+            consumer.acknowledge(consumer.receive());
+            received++;
+        }
+
+        Assert.assertEquals(received, messages);
+
+        // wait acknowledge send
+        Thread.sleep(2000);
+
+        stats = admin.topics().getStats(topicName);
+
+        Assert.assertFalse(stats.subscriptions.entrySet().iterator().next().getValue().consumers.get(0).blockedConsumerOnUnackedMsgs);
+        Assert.assertEquals(stats.subscriptions.entrySet().iterator().next().getValue().consumers.get(0).unackedMessages,
0);
+    }
+
+}


Mime
View raw message