Return-Path: X-Original-To: apmail-bookkeeper-commits-archive@www.apache.org Delivered-To: apmail-bookkeeper-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A036E103ED for ; Tue, 9 Dec 2014 18:59:16 +0000 (UTC) Received: (qmail 15084 invoked by uid 500); 9 Dec 2014 18:59:16 -0000 Delivered-To: apmail-bookkeeper-commits-archive@bookkeeper.apache.org Received: (qmail 15054 invoked by uid 500); 9 Dec 2014 18:59:16 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 15045 invoked by uid 99); 9 Dec 2014 18:59:16 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Dec 2014 18:59:16 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 190E6A215A5; Tue, 9 Dec 2014 18:59:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ivank@apache.org To: commits@bookkeeper.apache.org Message-Id: <736c9ba540e3485ba93eb1fd9cd5f124@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: bookkeeper git commit: BOOKKEEPER-795: Race condition causes writes to hang if ledger is fenced (sijie via ivank) Date: Tue, 9 Dec 2014 18:59:16 +0000 (UTC) Repository: bookkeeper Updated Branches: refs/heads/master 24591f151 -> e79f8736a BOOKKEEPER-795: Race condition causes writes to hang if ledger is fenced (sijie via ivank) Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/e79f8736 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/e79f8736 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/e79f8736 Branch: refs/heads/master Commit: e79f8736a7dfc2c7191455e4e88b85a85f0472bf Parents: 24591f1 Author: Ivan Kelly Authored: Tue Dec 9 19:57:43 2014 +0100 Committer: Ivan Kelly Committed: Tue Dec 9 19:57:43 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/bookkeeper/client/LedgerHandle.java | 54 ++++++++++++++++---- .../bookkeeper/client/LedgerCloseTest.java | 37 ++++++++++++++ .../replication/AuditorRollingRestartTest.java | 2 +- .../apache/bookkeeper/test/TestCallbacks.java | 18 +++++++ 5 files changed, 102 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e99d829..2edba29 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -24,6 +24,8 @@ Trunk (unreleased changes) BOOKKEEPER-804: Client program is not terminated when using openLedgerNoRecovery (ivank via sijie) + BOOKKEEPER-795: Race condition causes writes to hang if ledger is fenced (sijie via ivank) + IMPROVEMENTS: BOOKKEEPER-800: Expose whether a ledger is closed or not (ivank) http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 0265990..8cc00eb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -254,7 +254,7 @@ public class LedgerHandle { /** * Has the ledger been closed? */ - public boolean isClosed() { + public synchronized boolean isClosed() { return metadata.isClosed(); } @@ -286,13 +286,32 @@ public class LedgerHandle { final State prevState; List pendingAdds; - synchronized(LedgerHandle.this) { - // if the metadata is already closed, we don't need to proceed the process - // otherwise, it might end up encountering bad version error log messages when updating metadata - if (metadata.isClosed()) { - cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx); - return; + if (isClosed()) { + // TODO: make ledger metadata immutable + // Although the metadata is already closed, we don't need to proceed zookeeper metadata update, but + // we still need to error out the pending add ops. + // + // There is a race condition a pending add op is enqueued, after a close op reset ledger metadata state + // to unclosed to resolve metadata conflicts. If we don't error out these pending add ops, they would be + // leak and never callback. + // + // The race condition happen in following sequence: + // a) ledger L is fenced + // b) write entry E encountered LedgerFencedException, trigger ledger close procedure + // c) ledger close encountered metadata version exception and set ledger metadata back to open + // d) writer tries to write entry E+1, since ledger metadata is still open (reset by c)) + // e) the close procedure in c) resolved the metadata conflicts and set ledger metadata to closed + // f) writing entry E+1 encountered LedgerFencedException which will enter ledger close procedure + // g) it would find that ledger metadata is closed, then it callbacks immediately without erroring out any pendings + synchronized (LedgerHandle.this) { + pendingAdds = drainPendingAddsToErrorOut(); } + errorOutPendingAdds(rc, pendingAdds); + cb.closeComplete(BKException.Code.OK, LedgerHandle.this, ctx); + return; + } + + synchronized(LedgerHandle.this) { prevState = metadata.getState(); prevLastEntryId = metadata.getLastEntryId(); prevLength = metadata.getLength(); @@ -549,9 +568,24 @@ public class LedgerHandle { } if (wasClosed) { - LOG.warn("Attempt to add to closed ledger: {}", ledgerId); - cb.addComplete(BKException.Code.LedgerClosedException, - LedgerHandle.this, INVALID_ENTRY_ID, ctx); + // make sure the callback is triggered in main worker pool + try { + bk.mainWorkerPool.submit(new SafeRunnable() { + @Override + public void safeRun() { + LOG.warn("Attempt to add to closed ledger: {}", ledgerId); + cb.addComplete(BKException.Code.LedgerClosedException, + LedgerHandle.this, INVALID_ENTRY_ID, ctx); + } + @Override + public String toString() { + return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId); + } + }); + } catch (RejectedExecutionException e) { + cb.addComplete(bk.getReturnRc(BKException.Code.InterruptedException), + LedgerHandle.this, INVALID_ENTRY_ID, ctx); + } return; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java index eef56b0..6655e9e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerCloseTest.java @@ -19,9 +19,13 @@ package org.apache.bookkeeper.client; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.bookie.Bookie; @@ -34,11 +38,13 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.test.TestCallbacks.AddCallbackFuture; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.junit.Assert.*; +import static com.google.common.base.Charsets.UTF_8; /** * This class tests the ledger close logic. @@ -226,4 +232,35 @@ public class LedgerCloseTest extends BookKeeperClusterTestCase { bsConfs.add(conf); bs.add(startBookie(conf, dBookie)); } + + @Test(timeout = 60000) + public void testAllWritesAreCompletedOnClosedLedger() throws Exception { + for (int i = 0; i < 100; i++) { + LOG.info("Iteration {}", i); + + List futures = new ArrayList(); + LedgerHandle w = bkc.createLedger(DigestType.CRC32, new byte[0]); + AddCallbackFuture f = new AddCallbackFuture(0L); + w.asyncAddEntry("foobar".getBytes(UTF_8), f, null); + f.get(); + + LedgerHandle r = bkc.openLedger(w.getId(), DigestType.CRC32, new byte[0]); + for (int j = 0; j < 100; j++) { + AddCallbackFuture f1 = new AddCallbackFuture(1L + j); + w.asyncAddEntry("foobar".getBytes(), f1, null); + futures.add(f1); + } + + for (AddCallbackFuture f2: futures) { + try { + f2.get(10, TimeUnit.SECONDS); + } catch (ExecutionException ee) { + // we don't care about errors + } catch (TimeoutException te) { + LOG.error("Error on waiting completing entry {} : ", f2.getExpectedEntryId(), te); + fail("Should succeed on waiting completing entry " + f2.getExpectedEntryId()); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java index 1d4bb00..3e221b4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/AuditorRollingRestartTest.java @@ -55,7 +55,7 @@ public class AuditorRollingRestartTest extends BookKeeperClusterTestCase { LedgerHandle lh = bkc.createLedger(3, 3, DigestType.CRC32, "passwd".getBytes()); for (int i = 0; i < 10; i++) { - lh.asyncAddEntry("foobar".getBytes(), new TestCallbacks.AddCallbackFuture(), null); + lh.asyncAddEntry("foobar".getBytes(), new TestCallbacks.AddCallbackFuture(i), null); } lh.addEntry("foobar".getBytes()); lh.close(); http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/e79f8736/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java index 61217f0..5d9f99b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/TestCallbacks.java @@ -25,11 +25,16 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import com.google.common.util.concurrent.AbstractFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Callbacks implemented with SettableFuture, to be used in tests */ public class TestCallbacks { + + private static final Logger logger = LoggerFactory.getLogger(TestCallbacks.class); + public static class GenericCallbackFuture extends AbstractFuture implements GenericCallback { @Override @@ -44,8 +49,21 @@ public class TestCallbacks { public static class AddCallbackFuture extends AbstractFuture implements AddCallback { + + private final long expectedEntryId; + + public AddCallbackFuture(long entryId) { + this.expectedEntryId = entryId; + } + + public long getExpectedEntryId() { + return expectedEntryId; + } + @Override public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) { + logger.info("Add entry {} completed : entryId = {}, rc = {}", + new Object[] { expectedEntryId, entryId, rc }); if (rc != BKException.Code.OK) { setException(BKException.create(rc)); } else {