bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iv...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-795: Race condition causes writes to hang if ledger is fenced (sijie via ivank)
Date Tue, 09 Dec 2014 18:59:16 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 24591f151 -> e79f8736a


BOOKKEEPER-795: Race condition causes writes to hang if ledger is fenced (sijie via ivank)


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e79f8736
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e79f8736
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e79f8736

Branch: refs/heads/master
Commit: e79f8736a7dfc2c7191455e4e88b85a85f0472bf
Parents: 24591f1
Author: Ivan Kelly <ivan@bleurgh.com>
Authored: Tue Dec 9 19:57:43 2014 +0100
Committer: Ivan Kelly <ivan@bleurgh.com>
Committed: Tue Dec 9 19:57:43 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/bookkeeper/client/LedgerHandle.java  | 54 ++++++++++++++++----
 .../bookkeeper/client/LedgerCloseTest.java      | 37 ++++++++++++++
 .../replication/AuditorRollingRestartTest.java  |  2 +-
 .../apache/bookkeeper/test/TestCallbacks.java   | 18 +++++++
 5 files changed, 102 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e99d829..2edba29 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,8 @@ Trunk (unreleased changes)
 
       BOOKKEEPER-804: Client program is not terminated when using openLedgerNoRecovery (ivank
via sijie)
 
+      BOOKKEEPER-795: Race condition causes writes to hang if ledger is fenced (sijie via
ivank)
+
     IMPROVEMENTS:
 
       BOOKKEEPER-800: Expose whether a ledger is closed or not (ivank)

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 0265990..8cc00eb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -254,7 +254,7 @@ public class LedgerHandle {
     /**
      * Has the ledger been closed?
      */
-    public boolean isClosed() {
+    public synchronized boolean isClosed() {
         return metadata.isClosed();
     }
 
@@ -286,13 +286,32 @@ public class LedgerHandle {
                 final State prevState;
                 List<PendingAddOp> pendingAdds;
 
-                synchronized(LedgerHandle.this) {
-                    // if the metadata is already closed, we don't need to proceed the process
-                    // otherwise, it might end up encountering bad version error log messages
when updating metadata
-                    if (metadata.isClosed()) {
-                        cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
-                        return;
+                if (isClosed()) {
+                    // TODO: make ledger metadata immutable
+                    // Although the metadata is already closed, we don't need to proceed
zookeeper metadata update, but
+                    // we still need to error out the pending add ops.
+                    //
+                    // There is a race condition a pending add op is enqueued, after a close
op reset ledger metadata state
+                    // to unclosed to resolve metadata conflicts. If we don't error out these
pending add ops, they would be
+                    // leak and never callback.
+                    //
+                    // The race condition happen in following sequence:
+                    // a) ledger L is fenced
+                    // b) write entry E encountered LedgerFencedException, trigger ledger
close procedure
+                    // c) ledger close encountered metadata version exception and set ledger
metadata back to open
+                    // d) writer tries to write entry E+1, since ledger metadata is still
open (reset by c))
+                    // e) the close procedure in c) resolved the metadata conflicts and set
ledger metadata to closed
+                    // f) writing entry E+1 encountered LedgerFencedException which will
enter ledger close procedure
+                    // g) it would find that ledger metadata is closed, then it callbacks
immediately without erroring out any pendings
+                    synchronized (LedgerHandle.this) {
+                        pendingAdds = drainPendingAddsToErrorOut();
                     }
+                    errorOutPendingAdds(rc, pendingAdds);
+                    cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx);
+                    return;
+                }
+
+                synchronized(LedgerHandle.this) {
                     prevState = metadata.getState();
                     prevLastEntryId = metadata.getLastEntryId();
                     prevLength = metadata.getLength();
@@ -549,9 +568,24 @@ public class LedgerHandle {
         }
 
         if (wasClosed) {
-            LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
-            cb.addComplete(BKException.Code.LedgerClosedException,
-                           LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+            // make sure the callback is triggered in main worker pool
+            try {
+                bk.mainWorkerPool.submit(new SafeRunnable() {
+                    @Override
+                    public void safeRun() {
+                        LOG.warn("Attempt to add to closed ledger: {}", ledgerId);
+                        cb.addComplete(BKException.Code.LedgerClosedException,
+                                LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+                    }
+                    @Override
+                    public String toString() {
+                        return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
+                    }
+                });
+            } catch (RejectedExecutionException e) {
+                cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException),
+                        LedgerHandle.this, INVALID_ENTRY_ID, ctx);
+            }
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
index eef56b0..6655e9e 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java
@@ -19,9 +19,13 @@ package org.apache.bookkeeper.client;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.bookie.Bookie;
@@ -34,11 +38,13 @@ import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.TestCallbacks.AddCallbackFuture;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.junit.Assert.*;
+import static com.google.common.base.Charsets.UTF_8;
 
 /**
  * This class tests the ledger close logic.
@@ -226,4 +232,35 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase {
         bsConfs.add(conf);
         bs.add(startBookie(conf, dBookie));
     }
+
+    @Test(timeout = 60000)
+    public void testAllWritesAreCompletedOnClosedLedger() throws Exception {
+        for (int i = 0; i < 100; i++) {
+            LOG.info("Iteration {}", i);
+
+            List<AddCallbackFuture> futures = new ArrayList<AddCallbackFuture>();
+            LedgerHandle w = bkc.createLedger(DigestType.CRC32, new byte[0]);
+            AddCallbackFuture f = new AddCallbackFuture(0L);
+            w.asyncAddEntry("foobar".getBytes(UTF_8), f, null);
+            f.get();
+
+            LedgerHandle r = bkc.openLedger(w.getId(), DigestType.CRC32, new byte[0]);
+            for (int j = 0; j < 100; j++) {
+                AddCallbackFuture f1 = new AddCallbackFuture(1L + j);
+                w.asyncAddEntry("foobar".getBytes(), f1, null);
+                futures.add(f1);
+            }
+
+            for (AddCallbackFuture f2: futures) {
+                try {
+                    f2.get(10, TimeUnit.SECONDS);
+                } catch (ExecutionException ee) {
+                    // we don't care about errors
+                } catch (TimeoutException te) {
+                    LOG.error("Error on waiting completing entry {} : ", f2.getExpectedEntryId(),
te);
+                    fail("Should succeed on waiting completing entry " + f2.getExpectedEntryId());
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
index 1d4bb00..3e221b4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java
@@ -55,7 +55,7 @@ public class AuditorRollingRestartTest extends BookKeeperClusterTestCase
{
 
         LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes());
         for (int i = 0; i < 10; i++) {
-            lh.asyncAddEntry("foobar".getBytes(), new TestCallbacks.AddCallbackFuture(),
null);
+            lh.asyncAddEntry("foobar".getBytes(), new TestCallbacks.AddCallbackFuture(i),
null);
         }
         lh.addEntry("foobar".getBytes());
         lh.close();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
index 61217f0..5d9f99b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java
@@ -25,11 +25,16 @@ import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import com.google.common.util.concurrent.AbstractFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Callbacks implemented with SettableFuture, to be used in tests
  */
 public class TestCallbacks {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestCallbacks.class);
+
     public static class GenericCallbackFuture<T>
         extends AbstractFuture<T> implements GenericCallback<T> {
         @Override
@@ -44,8 +49,21 @@ public class TestCallbacks {
 
     public static class AddCallbackFuture
         extends AbstractFuture<Long> implements AddCallback {
+
+        private final long expectedEntryId;
+
+        public AddCallbackFuture(long entryId) {
+            this.expectedEntryId = entryId;
+        }
+
+        public long getExpectedEntryId() {
+            return expectedEntryId;
+        }
+
         @Override
         public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
+            logger.info("Add entry {} completed : entryId = {}, rc = {}",
+                    new Object[] { expectedEntryId, entryId, rc });
             if (rc != BKException.Code.OK) {
                 setException(BKException.create(rc));
             } else {


Mime
View raw message