pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhai...@apache.org
Subject [pulsar] branch branch-2.5 updated: Fix producer stucks on creating ledger timeout (#7319)
Date Tue, 23 Jun 2020 07:25:57 GMT
This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new afd2b07  Fix producer stucks on creating ledger timeout (#7319)
afd2b07 is described below

commit afd2b07ea78e49eb864e7611cacf062d33611dbe
Author: Sijie Guo <sijie@apache.org>
AuthorDate: Mon Jun 22 13:32:28 2020 -0700

    Fix producer stucks on creating ledger timeout (#7319)
    
    * Fix producer stucks on creating ledger timeout
    
    *Motivation*
    
    The `ledgerCreated` flag is passed as ctx to the createLedger callback.
    The callback already had the logic on handling `ledgerCreated` flag. But we set the flag
to `false`
    when timeout happens. It will trigger the following race condition:
    
    a) The createComplete callback is triggered when timeout. But the pending add requests
are not error'd out.
    b) If the createComplete callback eventually completes, it will see `ledgerCreated` flag
is set to true,
    so it will cause `checkAndCompleteLedgerOpTask` returns false and exist too early without
processing the
    pending add requests.
    
    This race condition only happens when creating ledger times out.
    
    ```
        public synchronized void createComplete(int rc, final LedgerHandle lh, Object ctx)
{
            if (log.isDebugEnabled()) {
                log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId()
: -1);
            }
    
            if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
                return;
            }
    ```
    
    *Modification*
    
    The timeout logic shouldn't modify the `ledgerCreated` context. It should let the callback
itself to process
    the `ledgerCreated` context.
    
    * Change to use CAS
    (cherry picked from commit a34f6939f0aa639c37192ea0c9bc7b927a245664)
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java |  8 ++++----
 .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 15 +++++++++------
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 12ca6cd..3c1ded6 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3091,8 +3091,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback
{
                 digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
         scheduledExecutor.schedule(() -> {
             if (!ledgerCreated.get()) {
-                ledgerCreated.set(true);
-                cb.createComplete(BKException.Code.TimeoutException, null, null);
+                cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
             }
         }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
     }
@@ -3108,14 +3107,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback
{
     protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
         if (ctx != null && ctx instanceof AtomicBoolean) {
             // ledger-creation is already timed out and callback is already completed so,
delete this ledger and return.
-            if (((AtomicBoolean) (ctx)).get()) {
+            if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) {
+                return false;
+            } else {
                 if (rc == BKException.Code.OK) {
                     log.warn("[{}]-{} ledger creation timed-out, deleting ledger", this.name,
lh.getId());
                     asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES);
                 }
                 return true;
             }
-            ((AtomicBoolean) ctx).set(true);
         }
         return false;
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index c2886cf..6f83b68 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -95,6 +95,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.impl.MetaStore.Stat;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -2268,16 +2269,18 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(),
any(), any(), any());
         AtomicInteger response = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(1);
-        ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
-            @Override
-            public void createComplete(int rc, LedgerHandle lh, Object ctx) {
-                response.set(rc);
-                latch.countDown();
-            }
+        AtomicReference<Object> ctxHolder = new AtomicReference<>();
+        ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {
+            response.set(rc);
+            latch.countDown();
+            ctxHolder.set(ctx);
         }, Collections.emptyMap());
 
         latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
         assertEquals(response.get(), BKException.Code.TimeoutException);
+        assertTrue(ctxHolder.get() instanceof AtomicBoolean);
+        AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get();
+        assertFalse(ledgerCreated.get());
 
         ledger.close();
     }


Mime
View raw message