pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] merlimat closed pull request #982: Retry on failure of ledger-deletion and avoid running callback in executor
Date Thu, 01 Jan 1970 00:00:00 GMT
merlimat closed pull request #982: Retry on failure of ledger-deletion and avoid running callback
in executor
URL: https://github.com/apache/incubator-pulsar/pull/982
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 9d0d5dc2d..bb9fc984d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -80,6 +80,8 @@
 import com.google.common.collect.TreeRangeSet;
 import com.google.common.util.concurrent.RateLimiter;
 import com.google.protobuf.InvalidProtocolBufferException;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
 
 public class ManagedCursorImpl implements ManagedCursor {
 
@@ -267,7 +269,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
             // Read the last entry in the ledger
             lh.asyncReadLastEntry((rc1, lh1, seq, ctx1) -> {
                 if (log.isDebugEnabled()) {
-                    log.debug("readComplete rc={} entryId={}", rc1, lh1.getLastAddConfirmed());
+                    log.debug("[{}} readComplete rc={} entryId={}", ledger.getName(), rc1,
lh1.getLastAddConfirmed());
                 }
                 if (isBkErrorNotRecoverable(rc1)) {
                     log.error("[{}] Error reading from metadata ledger {} for consumer {}:
{}", ledger.getName(),
@@ -984,7 +986,7 @@ private long getNumberOfEntries(Range<PositionImpl> range) {
         long allEntries = ledger.getNumberOfEntries(range);
 
         if (log.isDebugEnabled()) {
-            log.debug("getNumberOfEntries. {} allEntries: {}", range, allEntries);
+            log.debug("[{}] getNumberOfEntries. {} allEntries: {}", ledger.getName(), range,
allEntries);
         }
 
         long deletedEntries = 0;
@@ -1007,7 +1009,7 @@ private long getNumberOfEntries(Range<PositionImpl> range) {
         }
 
         if (log.isDebugEnabled()) {
-            log.debug("Found {} entries - deleted: {}", allEntries - deletedEntries, deletedEntries);
+            log.debug("[{}] Found {} entries - deleted: {}", ledger.getName(), allEntries
- deletedEntries, deletedEntries);
         }
         return allEntries - deletedEntries;
     }
@@ -1235,7 +1237,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition)
{
             readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
 
             if (log.isDebugEnabled()) {
-                log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition);
+                log.debug("[{}] Moved read position from: {} to: {}", ledger.getName(), oldReadPosition,
readPosition);
             }
         }
 
@@ -1258,8 +1260,8 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition)
{
             }
 
             if (log.isDebugEnabled()) {
-                log.debug("Moved ack position from: {} to: {} -- skipped: {}", oldMarkDeletePosition,
-                        newMarkDeletePosition, skippedEntries);
+                log.debug("[{}] Moved ack position from: {} to: {} -- skipped: {}", ledger.getName(),
+                        oldMarkDeletePosition, newMarkDeletePosition, skippedEntries);
             }
             messagesConsumedCounter += skippedEntries;
         }
@@ -1664,7 +1666,7 @@ public void rewind() {
             PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
             PositionImpl oldReadPosition = readPosition;
 
-            log.info("[{}] Rewind from {} to {}", name, oldReadPosition, newReadPosition);
+            log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition,
newReadPosition);
 
             readPosition = newReadPosition;
         } finally {
@@ -2156,47 +2158,65 @@ void readOperationCompleted() {
     }
 
     void asyncDeleteLedger(final LedgerHandle lh) {
-        if (lh == null) {
+        asyncDeleteLedger(lh, DEFAULT_LEDGER_DELETE_RETRIES);
+    }
+    
+    private void asyncDeleteLedger(final LedgerHandle lh, int retry) {
+        if (lh == null || retry <= 0) {
+            if (lh != null) {
+                log.warn("[{}-{}] Failed to delete ledger after retries {}", ledger.getName(),
name, lh.getId());
+            }
             return;
         }
 
         ledger.mbean.startCursorLedgerDeleteOp();
         bookkeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
-            ledger.getExecutor().submit(safeRun(() -> {
-                ledger.mbean.endCursorLedgerDeleteOp();
-                if (rc != BKException.Code.OK) {
-                    log.warn("[{}] Failed to delete ledger {}: {}", ledger.getName(), lh.getId(),
-                            BKException.getMessage(rc));
-                    return;
-                } else {
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}][{}] Successfully closed&deleted ledger {} in
cursor", ledger.getName(), name,
-                                lh.getId());
-                    }
+            ledger.mbean.endCursorLedgerDeleteOp();
+            if (rc != BKException.Code.OK) {
+                log.warn("[{}] Failed to delete ledger {}: {}", ledger.getName(), lh.getId(),
+                        BKException.getMessage(rc));
+                if (rc != BKException.Code.NoSuchLedgerExistsException) {
+                    ledger.getScheduledExecutor().schedule(safeRun(() -> {
+                        asyncDeleteLedger(lh, retry - 1);
+                    }), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
                 }
-            }));
+                return;
+            } else {
+                log.info("[{}][{}] Successfully closed & deleted ledger {} in cursor",
ledger.getName(), name,
+                        lh.getId());
+            }
         }, null);
     }
 
     void asyncDeleteCursorLedger() {
+        asyncDeleteCursorLedger(DEFAULT_LEDGER_DELETE_RETRIES);
+    }
+    
+    private void asyncDeleteCursorLedger(int retry) {
         STATE_UPDATER.set(this, State.Closed);
 
-        if (cursorLedger == null) {
-            // No ledger was created
+        if (cursorLedger == null || retry <= 0) {
+            if (cursorLedger != null) {
+                log.warn("[{}-{}] Failed to delete ledger after retries {}", ledger.getName(),
name,
+                        cursorLedger.getId());
+            }
             return;
         }
 
         ledger.mbean.startCursorLedgerDeleteOp();
         bookkeeper.asyncDeleteLedger(cursorLedger.getId(), (rc, ctx) -> {
-            ledger.getExecutor().submit(safeRun(() -> {
-                ledger.mbean.endCursorLedgerDeleteOp();
-                if (rc == BKException.Code.OK) {
-                    log.debug("[{}][{}] Deleted cursor ledger", cursorLedger.getId());
-                } else {
-                    log.warn("[{}][{}] Failed to delete ledger {}: {}", ledger.getName(),
name, cursorLedger.getId(),
-                            BKException.getMessage(rc));
+            ledger.mbean.endCursorLedgerDeleteOp();
+            if (rc == BKException.Code.OK) {
+                log.info("[{}][{}] Deleted cursor ledger {}", ledger.getName(), name, cursorLedger.getId());
+            } else {
+                log.warn("[{}][{}] Failed to delete ledger {}: {}", ledger.getName(), name,
cursorLedger.getId(),
+                        BKException.getMessage(rc));
+                if (rc != BKException.Code.NoSuchLedgerExistsException) {
+                    ledger.getScheduledExecutor().schedule(safeRun(() -> {
+                        asyncDeleteCursorLedger(retry - 1);
+                    }), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
                 }
-            }));
+            }
         }, null);
     }
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 596137933..84a4d8e2d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -20,6 +20,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import java.util.Iterator;
@@ -156,6 +157,9 @@
     final static long WaitTimeAfterLedgerCreationFailureMs = 10000;
 
     volatile PositionImpl lastConfirmedEntry;
+    
+    protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
+    protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
 
     enum State {
         None, // Uninitialized
@@ -1590,18 +1594,7 @@ public void operationComplete(Void result, Stat stat) {
 
                     for (LedgerInfo ls : ledgersToDelete) {
                         log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(),
ls.getSize());
-                        bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx) -> {
-                            if (rc == BKException.Code.NoSuchLedgerExistsException) {
-                                log.warn("[{}] Ledger was already deleted {}", name, ls.getLedgerId());
-                            } else if (rc != BKException.Code.OK) {
-                                log.error("[{}] Error deleting ledger {}", name, ls.getLedgerId(),
-                                        BKException.getMessage(rc));
-                            } else {
-                                if (log.isDebugEnabled()) {
-                                    log.debug("[{}] Deleted ledger {}", name, ls.getLedgerId());
-                                }
-                            }
-                        }, null);
+                        asyncDeleteLedger(ls.getLedgerId());
                     }
                 }
 
@@ -1693,6 +1686,31 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object
ctx) {
         }
     }
 
+    private void asyncDeleteLedger(long ledgerId) {
+        asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES);
+    }
+
+    private void asyncDeleteLedger(long ledgerId, long retry) {
+        if (retry <= 0) {
+            log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId);
+            return;
+        }
+        bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> {
+            if (rc == BKException.Code.NoSuchLedgerExistsException) {
+                log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
+            } else if (rc != BKException.Code.OK) {
+                log.error("[{}] Error deleting ledger {}", name, ledgerId, BKException.getMessage(rc));
+                scheduledExecutor.schedule(safeRun(() -> {
+                    asyncDeleteLedger(ledgerId, retry - 1);
+                }), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Deleted ledger {}", name, ledgerId);
+                }
+            }
+        }, null);
+    }
+    
     private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
         List<LedgerInfo> ledgers = Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values());
         AtomicInteger ledgersToDelete = new AtomicInteger(ledgers.size());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message