pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rdhaba...@apache.org
Subject [pulsar] branch master updated: Handle unknown runtime exception while reading entries (#2993)
Date Fri, 16 Nov 2018 08:01:49 GMT
This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c8c288  Handle unknown runtime exception while reading entries (#2993)
2c8c288 is described below

commit 2c8c288a9b5754e6fb551c538c993e4717adf7eb
Author: Rajan Dhabalia <rdhabalia@apache.org>
AuthorDate: Fri Nov 16 00:01:44 2018 -0800

    Handle unknown runtime exception while reading entries (#2993)
    
    * Handle unknown runtime exception while reading entries
    
    * make asyncReadEntry0 private
---
 .../bookkeeper/mledger/impl/EntryCacheImpl.java    | 30 +++++++++++++++++++++-
 1 file changed, 29 insertions(+), 1 deletion(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
index ff80fec..a435c14 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java
@@ -162,6 +162,20 @@ public class EntryCacheImpl implements EntryCache {
     @Override
     public void asyncReadEntry(ReadHandle lh, PositionImpl position, final ReadEntryCallback
callback,
             final Object ctx) {
+        try {
+            asyncReadEntry0(lh, position, callback, ctx);
+        } catch (Throwable t) {
+            log.warn("failed to read entries for {}-{}", lh.getId(), position, t);
+            // invalidate all entries related to ledger from the cache (it might happen if
entry gets corrupt
+            // (entry.data is already deallocate due to any race-condition) so, invalidate
cache and next time read from
+            // the bookie)
+            invalidateAllEntries(lh.getId());
+            callback.readEntryFailed(createManagedLedgerException(t), ctx);
+        }
+    }
+    
+    private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEntryCallback
callback,
+            final Object ctx) {
         if (log.isDebugEnabled()) {
             log.debug("[{}] Reading entry ledger {}: {}", ml.getName(), lh.getId(), position.getEntryId());
         }
@@ -202,9 +216,23 @@ public class EntryCacheImpl implements EntryCache {
     }
 
     @Override
-    @SuppressWarnings({ "unchecked", "rawtypes" })
     public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
             final ReadEntriesCallback callback, Object ctx) {
+        try {
+            asyncReadEntry0(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx);
+        } catch (Throwable t) {
+            log.warn("failed to read entries for {}--{}-{}", lh.getId(), firstEntry, lastEntry,
t);
+            // invalidate all entries related to ledger from the cache (it might happen if
entry gets corrupt
+            // (entry.data is already deallocate due to any race-condition) so, invalidate
cache and next time read from
+            // the bookie)
+            invalidateAllEntries(lh.getId());
+            callback.readEntriesFailed(createManagedLedgerException(t), ctx);
+        }
+    }
+    
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean
isSlowestReader,
+            final ReadEntriesCallback callback, Object ctx) {
         final long ledgerId = lh.getId();
         final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
         final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry);


Mime
View raw message