From commits-return-14510-archive-asf-public=cust-asf.ponee.io@pulsar.incubator.apache.org Mon Sep 17 06:55:29 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id A8CC7180627 for ; Mon, 17 Sep 2018 06:55:28 +0200 (CEST) Received: (qmail 18220 invoked by uid 500); 17 Sep 2018 04:55:27 -0000 Mailing-List: contact commits-help@pulsar.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pulsar.incubator.apache.org Delivered-To: mailing list commits@pulsar.incubator.apache.org Received: (qmail 18211 invoked by uid 99); 17 Sep 2018 04:55:27 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 17 Sep 2018 04:55:27 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 0C04982E11; Mon, 17 Sep 2018 04:55:27 +0000 (UTC) Date: Mon, 17 Sep 2018 04:55:26 +0000 To: "commits@pulsar.apache.org" Subject: [incubator-pulsar] branch master updated: Add ledger op timeout to avoid topics stuck on ledger-creation (#2535) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153716012688.16972.16456817699199088372@gitbox.apache.org> From: rdhabalia@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pulsar X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 858648eb24aeba9d95a7e83513830afd40cb21fe X-Git-Newrev: d5e88c1ec16df557655e42c9f648a2fd3343d759 X-Git-Rev: d5e88c1ec16df557655e42c9f648a2fd3343d759 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git The following commit(s) were added to refs/heads/master by this push: new d5e88c1 Add ledger op timeout to avoid topics stuck on ledger-creation (#2535) d5e88c1 is described below commit d5e88c1ec16df557655e42c9f648a2fd3343d759 Author: Rajan Dhabalia AuthorDate: Sun Sep 16 21:55:21 2018 -0700 Add ledger op timeout to avoid topics stuck on ledger-creation (#2535) * Add ledger op timeout to avoid topics stuck on ledger-creation * rename to metadataOperationsTimeoutSeconds * ad service config for managedLedgerMetadataOperationsTimeoutSeconds --- conf/broker.conf | 3 + conf/standalone.conf | 3 + .../bookkeeper/mledger/ManagedLedgerConfig.java | 21 +++ .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 101 +++++++------- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 150 ++++++++++++++------- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 47 ++++++- .../apache/pulsar/broker/ServiceConfiguration.java | 10 ++ .../pulsar/broker/service/BrokerService.java | 2 + 8 files changed, 234 insertions(+), 103 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 8927a85..2277cc8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -378,6 +378,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 # corrupted at bookkeeper and managed-cursor is stuck at that ledger. autoSkipNonRecoverableData=false +# operation timeout while updating managed-ledger metadata. +managedLedgerMetadataOperationsTimeoutSeconds=60 + ### --- Load balancer --- ### # Enable load balancer diff --git a/conf/standalone.conf b/conf/standalone.conf index a68664c..cc8f564 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -318,6 +318,9 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 # corrupted at bookkeeper and managed-cursor is stuck at that ledger. autoSkipNonRecoverableData=false +# operation timeout while updating managed-ledger metadata. +managedLedgerMetadataOperationsTimeoutSeconds=60 + ### --- Load balancer --- ### loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 698d245..5967453 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -56,6 +56,7 @@ public class ManagedLedgerConfig { private boolean autoSkipNonRecoverableData; private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4); private long offloadAutoTriggerSizeThresholdBytes = -1; + private long metadataOperationsTimeoutSeconds = 60; private DigestType digestType = DigestType.CRC32C; private byte[] password = "".getBytes(Charsets.UTF_8); @@ -511,4 +512,24 @@ public class ManagedLedgerConfig { this.clock = clock; return this; } + + /** + * + * Ledger-Op (Create/Delete) timeout + * + * @return + */ + public long getMetadataOperationsTimeoutSeconds() { + return metadataOperationsTimeoutSeconds; + } + + /** + * Ledger-Op (Create/Delete) timeout after which callback will be completed with failure + * + * @param metadataOperationsTimeoutSeconds + */ + public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long metadataOperationsTimeoutSeconds) { + this.metadataOperationsTimeoutSeconds = metadataOperationsTimeoutSeconds; + return this; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 50b4722..0ac818f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1974,68 +1974,73 @@ public class ManagedCursorImpl implements ManagedCursor { void createNewMetadataLedger(final VoidCallback callback) { ledger.mbean.startCursorLedgerCreateOp(); - bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(), - config.getMetadataAckQuorumSize(), digestType, config.getPassword(), (rc, lh, ctx) -> { - ledger.getExecutor().execute(safeRun(() -> { - ledger.mbean.endCursorLedgerCreateOp(); - if (rc != BKException.Code.OK) { - log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name, - BKException.getMessage(rc)); - callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc))); - return; - } + ledger.asyncCreateLedger(bookkeeper, config, digestType, (rc, lh, ctx) -> { + + if (ledger.checkAndCompleteLedgerOpTask(rc, lh, ctx)) { + return; + } + ledger.getExecutor().execute(safeRun(() -> { + ledger.mbean.endCursorLedgerCreateOp(); + if (rc != BKException.Code.OK) { + log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name, + BKException.getMessage(rc)); + callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc))); + return; + } + + if (log.isDebugEnabled()) { + log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name); + } + // Created the ledger, now write the last position + // content + MarkDeleteEntry mdEntry = lastMarkDeleteEntry; + persistPositionToLedger(lh, mdEntry, new VoidCallback() { + @Override + public void operationComplete() { if (log.isDebugEnabled()) { - log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name); + log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(), + mdEntry.newPosition, name); } - // Created the ledger, now write the last position - // content - MarkDeleteEntry mdEntry = lastMarkDeleteEntry; - persistPositionToLedger(lh, mdEntry, new VoidCallback() { + switchToNewLedger(lh, new VoidCallback() { @Override public void operationComplete() { - if (log.isDebugEnabled()) { - log.debug("[{}] Persisted position {} for cursor {}", ledger.getName(), - mdEntry.newPosition, name); - } - switchToNewLedger(lh, new VoidCallback() { - @Override - public void operationComplete() { - callback.operationComplete(); - } - - @Override - public void operationFailed(ManagedLedgerException exception) { - // it means it failed to switch the newly created ledger so, it should be - // deleted to prevent leak - bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> { - if (rc != BKException.Code.OK) { - log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(), - lh.getId()); - } - }, null); - callback.operationFailed(exception); - } - }); + callback.operationComplete(); } @Override public void operationFailed(ManagedLedgerException exception) { - log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(), - mdEntry.newPosition, name); - - ledger.mbean.startCursorLedgerDeleteOp(); - bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() { - @Override - public void deleteComplete(int rc, Object ctx) { - ledger.mbean.endCursorLedgerDeleteOp(); + // it means it failed to switch the newly created ledger so, it should be + // deleted to prevent leak + bookkeeper.asyncDeleteLedger(lh.getId(), (int rc, Object ctx) -> { + if (rc != BKException.Code.OK) { + log.warn("[{}] Failed to delete orphan ledger {}", ledger.getName(), + lh.getId()); } }, null); callback.operationFailed(exception); } }); - })); - }, null, Collections.emptyMap()); + } + + @Override + public void operationFailed(ManagedLedgerException exception) { + log.warn("[{}] Failed to persist position {} for cursor {}", ledger.getName(), + mdEntry.newPosition, name); + + ledger.mbean.startCursorLedgerDeleteOp(); + bookkeeper.asyncDeleteLedger(lh.getId(), new DeleteCallback() { + @Override + public void deleteComplete(int rc, Object ctx) { + ledger.mbean.endCursorLedgerDeleteOp(); + } + }, null); + callback.operationFailed(exception); + } + }); + })); + }, Collections.emptyMap()); + } private List buildPropertiesMap(Map properties) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f332e17..d33f151 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -22,17 +22,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.lang.Math.min; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; -import com.google.common.collect.BoundType; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; -import com.google.common.collect.Range; -import com.google.common.util.concurrent.RateLimiter; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - import java.time.Clock; import java.util.Collections; import java.util.Iterator; @@ -51,6 +40,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; @@ -62,6 +52,7 @@ import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.Backoff; @@ -111,6 +102,17 @@ import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.BoundType; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import com.google.common.collect.Range; +import com.google.common.util.concurrent.RateLimiter; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final static long MegaByte = 1024 * 1024; @@ -185,7 +187,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3; protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60; - + enum State { None, // Uninitialized LedgerOpened, // A ledger is ready to write into @@ -364,39 +366,44 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // Create a new ledger to start writing this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); - bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), - digestType, config.getPassword(), (rc, lh, ctx) -> { - executor.executeOrdered(name, safeRun(() -> { - mbean.endDataLedgerCreateOp(); - if (rc != BKException.Code.OK) { - callback.initializeFailed(createManagedLedgerException(rc)); - return; - } + + asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> { - log.info("[{}] Created ledger {}", name, lh.getId()); - STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); - currentLedger = lh; - - lastConfirmedEntry = new PositionImpl(lh.getId(), -1); - // bypass empty ledgers, find last ledger with Message if possible. - while (lastConfirmedEntry.getEntryId() == -1) { - Map.Entry formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId()); - if (formerLedger != null) { - LedgerInfo ledgerInfo = formerLedger.getValue(); - lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1); - } else { - break; - } - } + if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) { + return; + } + + executor.executeOrdered(name, safeRun(() -> { + mbean.endDataLedgerCreateOp(); + if (rc != BKException.Code.OK) { + callback.initializeFailed(createManagedLedgerException(rc)); + return; + } - LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); - ledgers.put(lh.getId(), info); + log.info("[{}] Created ledger {}", name, lh.getId()); + STATE_UPDATER.set(this, State.LedgerOpened); + lastLedgerCreatedTimestamp = clock.millis(); + currentLedger = lh; + + lastConfirmedEntry = new PositionImpl(lh.getId(), -1); + // bypass empty ledgers, find last ledger with Message if possible. + while (lastConfirmedEntry.getEntryId() == -1) { + Map.Entry formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId()); + if (formerLedger != null) { + LedgerInfo ledgerInfo = formerLedger.getValue(); + lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1); + } else { + break; + } + } + + LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); + ledgers.put(lh.getId(), info); - // Save it back to ensure all nodes exist - store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb); - })); - }, null, Collections.emptyMap()); + // Save it back to ensure all nodes exist + store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb); + })); + }, Collections.emptyMap()); } private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) { @@ -564,9 +571,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); - bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), - config.getAckQuorumSize(), digestType, config.getPassword(), this, null, - Collections.emptyMap()); + asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap()); } } else { checkArgument(state == State.LedgerOpened, "ledger=%s is not opened", state); @@ -1155,6 +1160,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (log.isDebugEnabled()) { log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1); } + + if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) { + return; + } + mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc)); @@ -1320,9 +1330,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { STATE_UPDATER.set(this, State.CreatingLedger); this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); - bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), - config.getAckQuorumSize(), digestType, config.getPassword(), this, null, - Collections.emptyMap()); + asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap()); } } @@ -2796,6 +2804,52 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } + /** + * Create ledger async and schedule a timeout task to check ledger-creation is complete else it fails the callback + * with TimeoutException. + * + * @param bookKeeper + * @param config + * @param digestType + * @param cb + * @param emptyMap + */ + protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType, + CreateCallback cb, Map emptyMap) { + AtomicBoolean ledgerCreated = new AtomicBoolean(false); + bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), + digestType, config.getPassword(), cb, ledgerCreated, Collections.emptyMap()); + scheduledExecutor.schedule(() -> { + if (!ledgerCreated.get()) { + ledgerCreated.set(true); + cb.createComplete(BKException.Code.TimeoutException, null, null); + } + }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS); + } + + /** + * check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger + * + * @param rc + * @param lh + * @param ctx + * @return + */ + protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) { + if (ctx != null && ctx instanceof AtomicBoolean) { + // ledger-creation is already timed out and callback is already completed so, delete this ledger and return. + if (((AtomicBoolean) (ctx)).get()) { + if (rc == BKException.Code.OK) { + log.warn("[{}]-{} ledger creation timed-out, deleting ledger", this.name, lh.getId()); + asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES); + } + return true; + } + ((AtomicBoolean) ctx).set(true); + } + return false; + } + private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d7cce97..aa85d80 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -18,6 +18,10 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -25,18 +29,12 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import com.google.common.base.Charsets; -import com.google.common.collect.Sets; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; - import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.nio.charset.Charset; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -47,10 +45,15 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; @@ -87,6 +90,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; +import com.google.common.base.Charsets; +import com.google.common.collect.Sets; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; + public class ManagedLedgerTest extends MockedBookKeeperTestCase { private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class); @@ -2215,4 +2225,27 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { assertFalse(factory.getManagedLedgers().containsKey("testManagedLedgerWithoutAutoCreate")); } + + @Test + public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig().setMetadataOperationsTimeoutSeconds(3); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("timeout_ledger_test", config); + + BookKeeper bk = mock(BookKeeper.class); + doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any()); + AtomicInteger response = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(1); + ledger.asyncCreateLedger(bk, config, null, new CreateCallback() { + @Override + public void createComplete(int rc, LedgerHandle lh, Object ctx) { + response.set(rc); + latch.countDown(); + } + }, Collections.emptyMap()); + + latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS); + assertEquals(response.get(), BKException.Code.TimeoutException); + + ledger.close(); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 65696f7..84cc569 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -373,6 +373,8 @@ public class ServiceConfiguration implements PulsarConfiguration { // corrupted at bookkeeper and managed-cursor is stuck at that ledger. @FieldContext(dynamic = true) private boolean autoSkipNonRecoverableData = false; + // operation timeout while updating managed-ledger metadata. + private long managedLedgerMetadataOperationsTimeoutSeconds = 60; /*** --- Load balancer --- ****/ // Enable load balancer @@ -1314,6 +1316,14 @@ public class ServiceConfiguration implements PulsarConfiguration { this.autoSkipNonRecoverableData = skipNonRecoverableLedger; } + public long getManagedLedgerMetadataOperationsTimeoutSeconds() { + return managedLedgerMetadataOperationsTimeoutSeconds; + } + + public void setManagedLedgerMetadataOperationsTimeoutSeconds(long managedLedgerMetadataOperationsTimeoutSeconds) { + this.managedLedgerMetadataOperationsTimeoutSeconds = managedLedgerMetadataOperationsTimeoutSeconds; + } + public boolean isLoadBalancerEnabled() { return loadBalancerEnabled; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8620e82..406d3e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -755,6 +755,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener