Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3C628200D5B for ; Wed, 13 Dec 2017 09:43:37 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3ABED160C24; Wed, 13 Dec 2017 08:43:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8A6C9160C16 for ; Wed, 13 Dec 2017 09:43:35 +0100 (CET) Received: (qmail 50841 invoked by uid 500); 13 Dec 2017 08:43:34 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 50832 invoked by uid 99); 13 Dec 2017 08:43:34 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Dec 2017 08:43:34 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id CC651820DB; Wed, 13 Dec 2017 08:43:33 +0000 (UTC) Date: Wed, 13 Dec 2017 08:43:33 +0000 To: "commits@bookkeeper.apache.org" Subject: [bookkeeper] branch master updated: Client times out requests in batch rather than individually MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151315461373.29797.6139576414031580242@gitbox.apache.org> From: zhaijia@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: bookkeeper X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 13c303f916b5b82702d73e2377b2caf75500243a X-Git-Newrev: 786da273b0f90c1b35b7244b0408959fd034d411 X-Git-Rev: 786da273b0f90c1b35b7244b0408959fd034d411 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated archived-at: Wed, 13 Dec 2017 08:43:37 -0000 This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bookkeeper.git The following commit(s) were added to refs/heads/master by this push: new 786da27 Client times out requests in batch rather than individually 786da27 is described below commit 786da273b0f90c1b35b7244b0408959fd034d411 Author: Ivan Kelly AuthorDate: Wed Dec 13 16:43:25 2017 +0800 Client times out requests in batch rather than individually This patch is a squash of f59b597f, 00fc5cd2, 1658eb61 & 833dc307 from the yahoo-4.3 branch. The change removes the HashWheelTimer from PerChannelBookieClient and instead runs through all outstanding ops periodically, timing them out as necessary. The motivation for this change is to reduce the number of objects created per add/read request. With HashWheelTimer a TimerTask needs to be added to the Timer, which allocates and returns a Timeout object. We had to be careful to cancel this timeout on success. The new approach is simpler, doesn't require any allocation per request, and doesn't require any cancellation. Functionally the end result is the same - very slow requests are timed out. The patch also makes some additions to the original changes (due to the codebase moving since it was originally written). - PendingAddOps are changed to use the same mechanism for quorum timeouts. - The checkTimeout method is moved to the PCBCPool so we can run the check on non-connected PCBC instances. This is necessary for TLS. Author: Ivan Kelly Author: Siddharth Boobna Author: Matteo Merli Reviewers: Enrico Olivelli , Jia Zhai This closes #817 from ivankelly/yahoo-bp-9 --- .../apache/bookkeeper/benchmark/BenchBookie.java | 9 +- .../org/apache/bookkeeper/client/BookKeeper.java | 6 +- .../org/apache/bookkeeper/client/LedgerHandle.java | 30 +++++ .../org/apache/bookkeeper/client/PendingAddOp.java | 36 ++---- .../bookkeeper/conf/ClientConfiguration.java | 41 ++++++- .../org/apache/bookkeeper/proto/BookieClient.java | 64 ++++++---- .../proto/DefaultPerChannelBookieClientPool.java | 7 ++ .../bookkeeper/proto/PerChannelBookieClient.java | 135 +++++++++++---------- .../proto/PerChannelBookieClientPool.java | 6 + .../client/TestGetBookieInfoTimeout.java | 10 +- .../apache/bookkeeper/test/BookieClientTest.java | 17 ++- .../org/apache/bookkeeper/test/LoopbackClient.java | 121 ------------------ 12 files changed, 236 insertions(+), 246 deletions(-) diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java index 9c52788..4506091 100644 --- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java +++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java @@ -24,7 +24,10 @@ import io.netty.buffer.Unpooled; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; @@ -33,6 +36,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; @@ -158,9 +162,11 @@ public class BenchBookie { .name("BenchBookieClientScheduler") .numThreads(1) .build(); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("BookKeeperClientScheduler")); ClientConfiguration conf = new ClientConfiguration(); - BookieClient bc = new BookieClient(conf, eventLoop, executor); + BookieClient bc = new BookieClient(conf, eventLoop, executor, scheduler, NullStatsLogger.INSTANCE); LatencyCallback lc = new LatencyCallback(); ThroughputCallback tc = new ThroughputCallback(); @@ -220,6 +226,7 @@ public class BenchBookie { LOG.info("Throughput: " + ((long) entryCount) * 1000 / (endTime - startTime)); bc.close(); + scheduler.shutdown(); eventLoop.shutdownGracefully(); executor.shutdown(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 99e9484..d0f8a64 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -146,6 +146,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { final int explicitLacInterval; final boolean delayEnsembleChange; final boolean reorderReadSequence; + final long addEntryQuorumTimeoutNanos; final Optional readSpeculativeRequestPolicy; final Optional readLACSpeculativeRequestPolicy; @@ -487,9 +488,9 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { this.readLACSpeculativeRequestPolicy = Optional.absent(); } - // initialize bookie client - this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool, statsLogger); + this.bookieClient = new BookieClient(conf, this.eventLoopGroup, this.mainWorkerPool, + scheduler, statsLogger); this.bookieWatcher = new BookieWatcher(conf, this.placementPolicy, regClient); if (conf.getDiskWeightBasedPlacementEnabled()) { LOG.info("Weighted ledger placement enabled"); @@ -520,6 +521,7 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { LOG.debug("Explicit LAC Interval : {}", this.explicitLacInterval); } + this.addEntryQuorumTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout()); scheduleBookieHealthCheckIfEnabled(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 78f3e75..5c15376 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -45,6 +45,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -103,6 +104,7 @@ public class LedgerHandle implements WriteHandle { final LoadingCache bookieFailureHistory; final boolean enableParallelRecoveryRead; final int recoveryReadBatchSize; + ScheduledFuture timeoutFuture = null; /** * Invalid entry id. This value is returned from methods which @@ -182,6 +184,19 @@ public class LedgerHandle implements WriteHandle { } }); initializeExplicitLacFlushPolicy(); + + if (bk.getConf().getAddEntryQuorumTimeout() > 0) { + SafeRunnable monitor = new SafeRunnable() { + @Override + public void safeRun() { + monitorPendingAddOps(); + } + }; + this.timeoutFuture = bk.scheduler.scheduleAtFixedRate(monitor, + bk.getConf().getTimeoutMonitorIntervalSec(), + bk.getConf().getTimeoutMonitorIntervalSec(), + TimeUnit.SECONDS); + } } protected void initializeExplicitLacFlushPolicy() { @@ -335,6 +350,9 @@ public class LedgerHandle implements WriteHandle { SyncCloseCallback callback = new SyncCloseCallback(result); asyncClose(callback, null); explicitLacFlushPolicy.stopExplicitLacFlush(); + if (timeoutFuture != null) { + timeoutFuture.cancel(false); + } return result; } @@ -1371,6 +1389,18 @@ public class LedgerHandle implements WriteHandle { asyncCloseInternal(NoopCloseCallback.instance, null, rc); } + private void monitorPendingAddOps() { + int timedOut = 0; + for (PendingAddOp op : pendingAddOps) { + if (op.maybeTimeout()) { + timedOut++; + } + } + if (timedOut > 0) { + LOG.info("Timed out {} add ops", timedOut); + } + } + void errorOutPendingAdds(int rc) { errorOutPendingAdds(rc, drainPendingAddsToErrorOut()); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index d414711..e42bbf3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -25,8 +25,6 @@ import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCountUtil; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; import java.util.Map; import java.util.concurrent.RejectedExecutionException; @@ -51,7 +49,7 @@ import org.slf4j.LoggerFactory; * * */ -class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask { +class PendingAddOp extends SafeRunnable implements WriteCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingAddOp.class); ByteBuf payload; @@ -68,8 +66,7 @@ class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask { boolean isRecoveryAdd = false; long requestTimeNanos; - int timeoutSec; - Timeout timeout = null; + long timeoutNanos; OpStatsLogger addOpLogger; long currentLedgerLength; @@ -91,14 +88,11 @@ class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask { op.completed = false; op.ackSet = lh.distributionSchedule.getAckSet(); op.addOpLogger = lh.bk.getAddOpLogger(); - if (op.timeout != null) { - op.timeout.cancel(); - } - op.timeout = null; - op.timeoutSec = lh.bk.getConf().getAddEntryQuorumTimeout(); + op.timeoutNanos = lh.bk.addEntryQuorumTimeoutNanos; op.pendingWriteRequests = 0; op.callbackTriggered = false; op.hasRun = false; + op.requestTimeNanos = Long.MAX_VALUE; return op; } @@ -131,9 +125,12 @@ class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask { ++pendingWriteRequests; } - @Override - public void run(Timeout timeout) { - timeoutQuorumWait(); + boolean maybeTimeout() { + if (MathUtils.elapsedNanos(requestTimeNanos) >= timeoutNanos) { + timeoutQuorumWait(); + return true; + } + return false; } void timeoutQuorumWait() { @@ -220,11 +217,6 @@ class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask { return; } - if (timeoutSec > -1) { - this.timeout = lh.bk.getBookieClient().scheduleTimeout( - this, timeoutSec, TimeUnit.SECONDS); - } - this.requestTimeNanos = MathUtils.nowInNano(); checkNotNull(lh); checkNotNull(lh.macManager); @@ -340,10 +332,6 @@ class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask { } void submitCallback(final int rc) { - if (null != timeout) { - timeout.cancel(); - } - if (LOG.isDebugEnabled()) { LOG.debug("Submit callback (lid:{}, eid: {}). rc:{}", lh.getId(), entryId, rc); } @@ -429,10 +417,6 @@ class PendingAddOp extends SafeRunnable implements WriteCallback, TimerTask { pendingWriteRequests = 0; callbackTriggered = false; hasRun = false; - if (timeout != null) { - timeout.cancel(); - } - timeout = null; recyclerHandle.recycle(this); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 361a383..966fd42 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -88,6 +88,7 @@ public class ClientConfiguration extends AbstractConfiguration { protected static final String ADD_ENTRY_TIMEOUT_SEC = "addEntryTimeoutSec"; protected static final String ADD_ENTRY_QUORUM_TIMEOUT_SEC = "addEntryQuorumTimeoutSec"; protected static final String READ_ENTRY_TIMEOUT_SEC = "readEntryTimeoutSec"; + protected static final String TIMEOUT_MONITOR_INTERVAL_SEC = "timeoutMonitorIntervalSec"; protected static final String TIMEOUT_TASK_INTERVAL_MILLIS = "timeoutTaskIntervalMillis"; protected static final String EXPLICIT_LAC_INTERVAL = "explicitLacInterval"; protected static final String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs"; @@ -678,11 +679,41 @@ public class ClientConfiguration extends AbstractConfiguration { } /** - * Get the interval between successive executions of the PerChannelBookieClient's - * TimeoutTask. This value is in milliseconds. Every X milliseconds, the timeout task - * will be executed and it will error out entries that have timed out. + * Get the interval between successive executions of the operation timeout monitor. This value is in seconds. + * + * @see #setTimeoutMonitorIntervalSec(long) + * @return the interval at which request timeouts will be checked + */ + public long getTimeoutMonitorIntervalSec() { + int minTimeout = Math.min(Math.min(getAddEntryQuorumTimeout(), + getAddEntryTimeout()), getReadEntryTimeout()); + return getLong(TIMEOUT_MONITOR_INTERVAL_SEC, Math.max(minTimeout / 2, 1)); + } + + /** + * Set the interval between successive executions of the operation timeout monitor. The value in seconds. + * Every X seconds, all outstanding add and read operations are checked to see if they have been running + * for longer than their configured timeout. Any that have been will be errored out. + * + *

This timeout should be set to a value which is a fraction of the values of + * {@link #getAddEntryQuorumTimeout}, {@link #getAddEntryTimeout} and {@link #getReadEntryTimeout}, + * so that these timeouts run in a timely fashion. + * + * @param timeoutInterval The timeout monitor interval, in seconds + * @return client configuration + */ + public ClientConfiguration setTimeoutMonitorIntervalSec(long timeoutInterval) { + setProperty(TIMEOUT_MONITOR_INTERVAL_SEC, Long.toString(timeoutInterval)); + return this; + } + + /** + * Get the interval between successive executions of the PerChannelBookieClient's TimeoutTask. This value is in + * milliseconds. Every X milliseconds, the timeout task will be executed and it will error out entries that have + * timed out. * *

We do it more aggressive to not accumulate pending requests due to slow responses. + * * @return the interval at which request timeouts will be checked */ @Deprecated @@ -729,6 +760,7 @@ public class ClientConfiguration extends AbstractConfiguration { * * @return tick duration in milliseconds */ + @Deprecated public long getPCBCTimeoutTimerTickDurationMs() { return getLong(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, 100); } @@ -745,6 +777,7 @@ public class ClientConfiguration extends AbstractConfiguration { * tick duration in milliseconds. * @return client configuration. */ + @Deprecated public ClientConfiguration setPCBCTimeoutTimerTickDurationMs(long tickDuration) { setProperty(PCBC_TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration); return this; @@ -759,6 +792,7 @@ public class ClientConfiguration extends AbstractConfiguration { * * @return number of ticks that used for timeout timer. */ + @Deprecated public int getPCBCTimeoutTimerNumTicks() { return getInt(PCBC_TIMEOUT_TIMER_NUM_TICKS, 1024); } @@ -775,6 +809,7 @@ public class ClientConfiguration extends AbstractConfiguration { * number of ticks that used for timeout timer. * @return client configuration. */ + @Deprecated public ClientConfiguration setPCBCTimeoutTimerNumTicks(int numTicks) { setProperty(PCBC_TIMEOUT_TIMER_NUM_TICKS, numTicks); return this; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index ceac697..9b1ca9c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -23,23 +23,23 @@ package org.apache.bookkeeper.proto; import static com.google.common.base.Charsets.UTF_8; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ExtensionRegistry; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.util.HashedWheelTimer; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -76,10 +76,12 @@ public class BookieClient implements PerChannelBookieClientFactory { AtomicLong totalBytesOutstanding = new AtomicLong(); OrderedSafeExecutor executor; + ScheduledExecutorService scheduler; + ScheduledFuture timeoutFuture; + EventLoopGroup eventLoopGroup; final ConcurrentHashMap channels = new ConcurrentHashMap(); - final HashedWheelTimer requestTimer; private final ClientAuthProvider.Factory authProviderFactory; private final ExtensionRegistry registry; @@ -93,12 +95,8 @@ public class BookieClient implements PerChannelBookieClientFactory { private final long bookieErrorThresholdPerInterval; public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup, - OrderedSafeExecutor executor) throws IOException { - this(conf, eventLoopGroup, executor, NullStatsLogger.INSTANCE); - } - - public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup, - OrderedSafeExecutor executor, StatsLogger statsLogger) throws IOException { + OrderedSafeExecutor executor, ScheduledExecutorService scheduler, + StatsLogger statsLogger) throws IOException { this.conf = conf; this.eventLoopGroup = eventLoopGroup; this.executor = executor; @@ -110,11 +108,21 @@ public class BookieClient implements PerChannelBookieClientFactory { this.statsLogger = statsLogger; this.numConnectionsPerBookie = conf.getNumChannelsPerBookie(); - this.requestTimer = new HashedWheelTimer( - new ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(), - conf.getPCBCTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, - conf.getPCBCTimeoutTimerNumTicks()); this.bookieErrorThresholdPerInterval = conf.getBookieErrorThresholdPerInterval(); + + this.scheduler = scheduler; + if (conf.getAddEntryTimeout() > 0 || conf.getReadEntryTimeout() > 0) { + SafeRunnable monitor = new SafeRunnable() { + @Override + public void safeRun() { + monitorPendingOperations(); + } + }; + this.timeoutFuture = this.scheduler.scheduleAtFixedRate(monitor, + conf.getTimeoutMonitorIntervalSec(), + conf.getTimeoutMonitorIntervalSec(), + TimeUnit.SECONDS); + } } private int getRc(int rc) { @@ -145,8 +153,8 @@ public class BookieClient implements PerChannelBookieClientFactory { @Override public PerChannelBookieClient create(BookieSocketAddress address, PerChannelBookieClientPool pcbcPool, SecurityHandlerFactory shFactory) throws SecurityException { - return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, requestTimer, statsLogger, - authProviderFactory, registry, pcbcPool, shFactory); + return new PerChannelBookieClient(conf, executor, eventLoopGroup, address, statsLogger, + authProviderFactory, registry, pcbcPool, shFactory); } private PerChannelBookieClientPool lookupClient(BookieSocketAddress addr) { @@ -521,12 +529,14 @@ public class BookieClient implements PerChannelBookieClientFactory { } } - public boolean isClosed() { - return closed; + private void monitorPendingOperations() { + for (PerChannelBookieClientPool clientPool : channels.values()) { + clientPool.checkTimeoutOnPendingOperations(); + } } - public Timeout scheduleTimeout(TimerTask task, long timeoutSec, TimeUnit timeUnit) { - return requestTimer.newTimeout(task, timeoutSec, timeUnit); + public boolean isClosed() { + return closed; } public void close() { @@ -538,11 +548,13 @@ public class BookieClient implements PerChannelBookieClientFactory { } channels.clear(); authProviderFactory.close(); + + if (timeoutFuture != null) { + timeoutFuture.cancel(false); + } } finally { closeLock.writeLock().unlock(); } - // Shut down the timeout executor. - this.requestTimer.stop(); } private static class Counter { @@ -599,7 +611,10 @@ public class BookieClient implements PerChannelBookieClientFactory { .name("BookieClientWorker") .numThreads(1) .build(); - BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor); + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("BookKeeperClientScheduler")); + BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor, + scheduler, NullStatsLogger.INSTANCE); BookieSocketAddress addr = new BookieSocketAddress(args[0], Integer.parseInt(args[1])); for (int i = 0; i < 100000; i++) { @@ -608,6 +623,7 @@ public class BookieClient implements PerChannelBookieClientFactory { } counter.wait(0); System.out.println("Total = " + counter.total()); + scheduler.shutdown(); eventLoopGroup.shutdownGracefully(); executor.shutdown(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java index 3ac5cb6..41233cf 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/DefaultPerChannelBookieClientPool.java @@ -95,6 +95,13 @@ class DefaultPerChannelBookieClientPool implements PerChannelBookieClientPool, } @Override + public void checkTimeoutOnPendingOperations() { + for (int i = 0; i < clients.length; i++) { + clients[i].checkTimeoutOnPendingOperations(); + } + } + + @Override public void recordError() { errorCounter.incrementAndGet(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 97bdaea..b128bc8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -54,11 +54,8 @@ import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.ssl.SslHandler; -import io.netty.util.HashedWheelTimer; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; -import io.netty.util.Timeout; -import io.netty.util.TimerTask; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; @@ -70,12 +67,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Queue; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.BiPredicate; import javax.net.ssl.SSLPeerUnverifiedException; @@ -147,9 +146,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { final BookieSocketAddress addr; final EventLoopGroup eventLoopGroup; final OrderedSafeExecutor executor; - final HashedWheelTimer requestTimer; - final int addEntryTimeout; - final int readEntryTimeout; + final long addEntryTimeoutNanos; + final long readEntryTimeoutNanos; final int maxFrameSize; final int getBookieInfoTimeout; final int startTLSTimeout; @@ -203,7 +201,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr) throws SecurityException { - this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, NullStatsLogger.INSTANCE, null, null, + this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, null, null, null); } @@ -211,24 +209,22 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { BookieSocketAddress addr, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry) throws SecurityException { - this(new ClientConfiguration(), executor, eventLoopGroup, addr, null, NullStatsLogger.INSTANCE, + this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, authProviderFactory, extRegistry, null); } public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr, - HashedWheelTimer requestTimer, StatsLogger parentStatsLogger, - ClientAuthProvider.Factory authProviderFactory, + StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry, PerChannelBookieClientPool pcbcPool) throws SecurityException { - this(conf, executor, eventLoopGroup, addr, null, NullStatsLogger.INSTANCE, + this(conf, executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, authProviderFactory, extRegistry, pcbcPool, null); } public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr, - HashedWheelTimer requestTimer, StatsLogger parentStatsLogger, - ClientAuthProvider.Factory authProviderFactory, + StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry, PerChannelBookieClientPool pcbcPool, SecurityHandlerFactory shFactory) throws SecurityException { @@ -242,9 +238,8 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { this.eventLoopGroup = eventLoopGroup; } this.state = ConnectionState.DISCONNECTED; - this.requestTimer = requestTimer; - this.addEntryTimeout = conf.getAddEntryTimeout(); - this.readEntryTimeout = conf.getReadEntryTimeout(); + this.addEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout()); + this.readEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout()); this.getBookieInfoTimeout = conf.getBookieInfoTimeout(); this.startTLSTimeout = conf.getStartTLSTimeout(); this.useV2WireProtocol = conf.getUseV2WireProtocol(); @@ -768,6 +763,30 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { writeAndFlush(channel, completionKey, getBookieInfoRequest); } + private static final BiPredicate timeoutCheck = (key, value) -> { + return value.maybeTimeout(); + }; + + public void checkTimeoutOnPendingOperations() { + int timedOutOperations = completionObjects.removeIf(timeoutCheck); + + synchronized (this) { + Iterator iterator = completionObjectsV2Conflicts.values().iterator(); + while (iterator.hasNext()) { + CompletionValue value = iterator.next(); + if (value.maybeTimeout()) { + ++timedOutOperations; + iterator.remove(); + } + } + } + + if (timedOutOperations > 0) { + LOG.info("Timed-out {} operations to channel {} for {}", + timedOutOperations, channel, addr); + } + } + /** * Disconnects the bookie client. It can be reused. */ @@ -819,6 +838,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { cf.awaitUninterruptibly(); } } + } private ChannelFuture closeChannel(Channel c) { @@ -1235,14 +1255,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { protected long ledgerId; protected long entryId; protected long startTime; - protected Timeout timeout; public CompletionValue(String operationName, Object ctx, long ledgerId, long entryId, OpStatsLogger opLogger, - OpStatsLogger timeoutOpLogger, - Timeout timeout) { + OpStatsLogger timeoutOpLogger) { this.operationName = operationName; this.ctx = ctx; this.ledgerId = ledgerId; @@ -1250,19 +1268,13 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { this.startTime = MathUtils.nowInNano(); this.opLogger = opLogger; this.timeoutOpLogger = timeoutOpLogger; - this.timeout = timeout; } private long latency() { return MathUtils.elapsedNanos(startTime); } - void cancelTimeoutAndLogOp(int rc) { - Timeout t = timeout; - if (null != t) { - t.cancel(); - } - + void logOpResult(int rc) { if (rc != BKException.Code.OK) { opLogger.registerFailedEvent(latency(), TimeUnit.NANOSECONDS); } else { @@ -1275,6 +1287,15 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { } } + boolean maybeTimeout() { + if (MathUtils.elapsedNanos(startTime) >= readEntryTimeoutNanos) { + timeout(); + return true; + } else { + return false; + } + } + void timeout() { errorOut(BKException.Code.TimeoutException); timeoutOpLogger.registerSuccessfulEvent(latency(), @@ -1344,14 +1365,13 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { final long ledgerId) { super("WriteLAC", originalCtx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, - writeLacOpLogger, writeLacTimeoutOpLogger, - scheduleTimeout(key, addEntryTimeout)); + writeLacOpLogger, writeLacTimeoutOpLogger); this.cb = new WriteLacCallback() { @Override public void writeLacComplete(int rc, long ledgerId, BookieSocketAddress addr, Object ctx) { - cancelTimeoutAndLogOp(rc); + logOpResult(rc); originalCallback.writeLacComplete(rc, ledgerId, addr, originalCtx); key.release(); @@ -1392,15 +1412,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { ReadLacCallback originalCallback, final Object ctx, final long ledgerId) { super("ReadLAC", ctx, ledgerId, BookieProtocol.LAST_ADD_CONFIRMED, - readLacOpLogger, readLacTimeoutOpLogger, - scheduleTimeout(key, readEntryTimeout)); + readLacOpLogger, readLacTimeoutOpLogger); this.cb = new ReadLacCallback() { @Override public void readLacComplete(int rc, long ledgerId, ByteBuf lacBuffer, ByteBuf lastEntryBuffer, Object ctx) { - cancelTimeoutAndLogOp(rc); + logOpResult(rc); originalCallback.readLacComplete( rc, ledgerId, lacBuffer, lastEntryBuffer, ctx); key.release(); @@ -1452,15 +1471,14 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { final Object originalCtx, long ledgerId, final long entryId) { super("Read", originalCtx, ledgerId, entryId, - readEntryOpLogger, readTimeoutOpLogger, - scheduleTimeout(key, readEntryTimeout)); + readEntryOpLogger, readTimeoutOpLogger); this.cb = new ReadEntryCallback() { @Override public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) { - cancelTimeoutAndLogOp(rc); + logOpResult(rc); originalCallback.readEntryComplete(rc, ledgerId, entryId, buffer, originalCtx); @@ -1550,12 +1568,11 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { public StartTLSCompletion(final CompletionKey key) { super("StartTLS", null, -1, -1, - startTLSOpLogger, startTLSTimeoutOpLogger, - scheduleTimeout(key, startTLSTimeout)); + startTLSOpLogger, startTLSTimeoutOpLogger); this.cb = new StartTLSCallback() { @Override public void startTLSComplete(int rc, Object ctx) { - cancelTimeoutAndLogOp(rc); + logOpResult(rc); key.release(); } }; @@ -1602,13 +1619,12 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { final GetBookieInfoCallback origCallback, final Object origCtx) { super("GetBookieInfo", origCtx, 0L, 0L, - getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger, - scheduleTimeout(key, getBookieInfoTimeout)); + getBookieInfoOpLogger, getBookieInfoTimeoutOpLogger); this.cb = new GetBookieInfoCallback() { @Override public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) { - cancelTimeoutAndLogOp(rc); + logOpResult(rc); origCallback.getBookieInfoComplete(rc, bInfo, origCtx); key.release(); } @@ -1668,8 +1684,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { WriteCallback originalCallback = null; AddCompletion(Recycler.Handle handle) { - super("Add", null, -1, -1, - addEntryOpLogger, addTimeoutOpLogger, null); + super("Add", null, -1, -1, addEntryOpLogger, addTimeoutOpLogger); this.handle = handle; } @@ -1683,20 +1698,29 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { this.ledgerId = ledgerId; this.entryId = entryId; this.startTime = MathUtils.nowInNano(); - this.timeout = scheduleTimeout(key, addEntryTimeout); } @Override public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { - cancelTimeoutAndLogOp(rc); + logOpResult(rc); originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx); key.release(); handle.recycle(this); } @Override + boolean maybeTimeout() { + if (MathUtils.elapsedNanos(startTime) >= addEntryTimeoutNanos) { + timeout(); + return true; + } else { + return false; + } + } + + @Override public void errorOut() { errorOut(BKException.Code.BookieHandleNotAvailableException); } @@ -1739,14 +1763,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { return new V3CompletionKey(txnId, operationType); } - Timeout scheduleTimeout(CompletionKey key, long timeout) { - if (null != requestTimer) { - return requestTimer.newTimeout(key, timeout, TimeUnit.SECONDS); - } else { - return null; - } - } - class V3CompletionKey extends CompletionKey { public V3CompletionKey(long txnId, OperationType operationType) { @@ -1774,7 +1790,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { } - abstract class CompletionKey implements TimerTask { + abstract class CompletionKey { final long txnId; OperationType operationType; @@ -1784,17 +1800,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { this.operationType = operationType; } - @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled()) { - return; - } - CompletionValue completion = completionObjects.remove(this); - if (completion != null) { - completion.timeout(); - } - } - public void release() {} } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java index bd07a4e..80f00a5 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClientPool.java @@ -46,6 +46,12 @@ interface PerChannelBookieClientPool { void recordError(); /** + * Check if any ops on any channel needs to be timed out. + * This is called on all channels, even if the channel is not yet connected. + */ + void checkTimeoutOnPendingOperations(); + + /** * Disconnect the connections in the pool. * * @param wait diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java index d2bff33..bf6c230 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java @@ -25,8 +25,11 @@ import static org.junit.Assert.assertTrue; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -36,6 +39,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.junit.After; @@ -53,6 +57,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase { DigestType digestType; public EventLoopGroup eventLoopGroup; public OrderedSafeExecutor executor; + private ScheduledExecutorService scheduler; public TestGetBookieInfoTimeout() { super(10); @@ -68,10 +73,13 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase { .name("BKClientOrderedSafeExecutor") .numThreads(2) .build(); + scheduler = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("BookKeeperClientScheduler")); } @After public void tearDown() throws Exception { + scheduler.shutdown(); eventLoopGroup.shutdownGracefully(); executor.shutdown(); } @@ -99,7 +107,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase { // try to get bookie info from the sleeping bookie. It should fail with timeout error BookieSocketAddress addr = new BookieSocketAddress(bookieToSleep.getSocketAddress().getHostString(), bookieToSleep.getPort()); - BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor); + BookieClient bc = new BookieClient(cConf, eventLoopGroup, executor, scheduler, NullStatsLogger.INSTANCE); long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 6bf05ae..b799635 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -27,12 +27,15 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BKException.Code; @@ -48,6 +51,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCall import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.proto.BookkeeperProtocol; +import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.junit.After; @@ -64,6 +68,7 @@ public class BookieClientTest { public EventLoopGroup eventLoopGroup; public OrderedSafeExecutor executor; + private ScheduledExecutorService scheduler; ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); @Before @@ -83,10 +88,13 @@ public class BookieClientTest { .name("BKClientOrderedSafeExecutor") .numThreads(2) .build(); + scheduler = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("BookKeeperClientScheduler")); } @After public void tearDown() throws Exception { + scheduler.shutdown(); bs.shutdown(); recursiveDelete(tmpDir); eventLoopGroup.shutdownGracefully(); @@ -146,7 +154,8 @@ public class BookieClientTest { BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port); ResultStruct arc = new ResultStruct(); - BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor); + BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor, + scheduler, NullStatsLogger.INSTANCE); ByteBuf bb = createByteBuffer(1, 1, 1); bc.addEntry(addr, 1, passwd, 1, bb, wrcb, arc, BookieProtocol.FLAG_NONE); synchronized (arc) { @@ -246,7 +255,8 @@ public class BookieClientTest { public void testNoLedger() throws Exception { ResultStruct arc = new ResultStruct(); BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port); - BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor); + BookieClient bc = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor, + scheduler, NullStatsLogger.INSTANCE); synchronized (arc) { bc.readEntry(addr, 2, 13, recb, arc); arc.wait(1000); @@ -257,7 +267,8 @@ public class BookieClientTest { @Test public void testGetBookieInfo() throws IOException, InterruptedException { BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", port); - BookieClient bc = new BookieClient(new ClientConfiguration(), new NioEventLoopGroup(), executor); + BookieClient bc = new BookieClient(new ClientConfiguration(), new NioEventLoopGroup(), executor, + scheduler, NullStatsLogger.INSTANCE); long flags = BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE | BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java deleted file mode 100644 index fc20476..0000000 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/LoopbackClient.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.bookkeeper.test; - -import io.netty.buffer.Unpooled; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; - -import java.io.IOException; -import java.util.Arrays; - -import org.apache.bookkeeper.conf.ClientConfiguration; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.proto.BookieClient; -import org.apache.bookkeeper.proto.BookieProtocol; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; -import org.apache.bookkeeper.util.OrderedSafeExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class tests BookieClient. It just sends the a new entry to itself. - */ -class LoopbackClient implements WriteCallback { - - private static final Logger LOG = LoggerFactory.getLogger(LoopbackClient.class); - - BookieClient client; - static int recvTimeout = 2000; - long begin = 0; - int limit; - OrderedSafeExecutor executor; - - static class Counter { - int c; - int limit; - - Counter(int limit) { - this.c = 0; - this.limit = limit; - } - - synchronized void increment() { - if (++c == limit) { - this.notify(); - } - } - } - - LoopbackClient(EventLoopGroup eventLoopGroup, OrderedSafeExecutor executor, long begin, int limit) - throws IOException { - this.client = new BookieClient(new ClientConfiguration(), eventLoopGroup, executor); - this.begin = begin; - } - - void write(long ledgerId, long entry, byte[] data, BookieSocketAddress addr, WriteCallback cb, Object ctx) - throws IOException, InterruptedException { - LOG.info("Ledger id: " + ledgerId + ", Entry: " + entry); - byte[] passwd = new byte[20]; - Arrays.fill(passwd, (byte) 'a'); - - client.addEntry(addr, ledgerId, passwd, entry, Unpooled.wrappedBuffer(data), cb, ctx, BookieProtocol.FLAG_NONE); - } - - @Override - public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddress addr, Object ctx) { - Counter counter = (Counter) ctx; - counter.increment(); - } - - public static void main(String args[]) { - byte[] data = new byte[Integer.parseInt(args[0])]; - Integer limit = Integer.parseInt(args[1]); - Counter c = new Counter(limit); - long ledgerId = Long.valueOf("0").longValue(); - long begin = System.currentTimeMillis(); - - LoopbackClient lb; - EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() - .name("BookieClientScheduler") - .numThreads(2) - .build(); - try { - BookieSocketAddress addr = new BookieSocketAddress("127.0.0.1", Integer.valueOf(args[2]).intValue()); - lb = new LoopbackClient(eventLoopGroup, executor, begin, limit.intValue()); - - for (int i = 0; i < limit; i++) { - lb.write(ledgerId, i, data, addr, lb, c); - } - - synchronized (c) { - c.wait(); - System.out.println("Time to write all entries: " + (System.currentTimeMillis() - begin)); - } - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - -} -- To stop receiving notification emails like this one, please contact ['"commits@bookkeeper.apache.org" '].