pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Fix process of calculating msgBacklog included in stats (#3092)
Date Thu, 29 Nov 2018 17:35:25 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ddaa2f  Fix process of calculating msgBacklog included in stats (#3092)
0ddaa2f is described below

commit 0ddaa2f92c72cc3c1a19fcb90a6412eb0a3799de
Author: massakam <massakam@yahoo-corp.jp>
AuthorDate: Fri Nov 30 02:35:20 2018 +0900

    Fix process of calculating msgBacklog included in stats (#3092)
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |  2 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 31 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 4085f80..35cd1b7 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -694,7 +694,7 @@ public class ManagedCursorImpl implements ManagedCursor {
         long backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(ledger) - messagesConsumedCounter;
         if (backlog < 0) {
             // In some case the counters get incorrect values, fall back to the precise backlog
count
-            backlog = getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition()));
+            backlog = getNumberOfEntries(Range.closed(markDeletePosition, ledger.getLastPosition()))
- 1;
         }
 
         return backlog;
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index d8c48de..6d90016 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -28,6 +28,7 @@ import static org.testng.Assert.*;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import java.lang.reflect.Field;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -277,6 +278,36 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
     }
 
     @Test(timeOut = 20000)
+    void testNumberOfEntriesInBacklogWithFallback() throws Exception {
+        ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
+
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ManagedCursor c2 = ledger.openCursor("c2");
+        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+        ManagedCursor c3 = ledger.openCursor("c3");
+        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+        ManagedCursor c4 = ledger.openCursor("c4");
+        ledger.addEntry("dummy-entry-4".getBytes(Encoding));
+        ManagedCursor c5 = ledger.openCursor("c5");
+
+        Field field = ManagedCursorImpl.class.getDeclaredField("messagesConsumedCounter");
+        field.setAccessible(true);
+        long counter = ((ManagedLedgerImpl) ledger).getEntriesAddedCounter() + 1;
+        field.setLong(c1, counter);
+        field.setLong(c2, counter);
+        field.setLong(c3, counter);
+        field.setLong(c4, counter);
+        field.setLong(c5, counter);
+
+        assertEquals(c1.getNumberOfEntriesInBacklog(), 4);
+        assertEquals(c2.getNumberOfEntriesInBacklog(), 3);
+        assertEquals(c3.getNumberOfEntriesInBacklog(), 2);
+        assertEquals(c4.getNumberOfEntriesInBacklog(), 1);
+        assertEquals(c5.getNumberOfEntriesInBacklog(), 0);
+    }
+
+    @Test(timeOut = 20000)
     void testNumberOfEntriesWithReopen() throws Exception {
         ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
 


Mime
View raw message