Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 6005C200C21 for ; Sun, 5 Feb 2017 18:09:08 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 5EC7D160B59; Sun, 5 Feb 2017 17:09:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8A6D0160B48 for ; Sun, 5 Feb 2017 18:09:06 +0100 (CET) Received: (qmail 7026 invoked by uid 500); 5 Feb 2017 17:09:05 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 6991 invoked by uid 99); 5 Feb 2017 17:09:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 05 Feb 2017 17:09:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5CC36E0210; Sun, 5 Feb 2017 17:09:05 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aleksey@apache.org To: commits@cassandra.apache.org Date: Sun, 05 Feb 2017 17:09:05 -0000 Message-Id: <64a55352a4184f76ba5489a3bc9ffdeb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/8] cassandra git commit: Move to FastThreadLocalThread and FastThreadLocal archived-at: Sun, 05 Feb 2017 17:09:08 -0000 Repository: cassandra Updated Branches: refs/heads/trunk ded6b7010 -> 45c92ba91 Move to FastThreadLocalThread and FastThreadLocal patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13034 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cecbe17e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cecbe17e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cecbe17e Branch: refs/heads/trunk Commit: cecbe17e3eafc052acc13950494f7dddf026aa54 Parents: f71e7e1 Author: Robert Stupp Authored: Tue Dec 13 17:37:09 2016 +0100 Committer: Aleksey Yeschenko Committed: Sun Feb 5 16:54:19 2017 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../concurrent/NamedThreadFactory.java | 42 ++++++++- .../functions/ThreadAwareSecurityManager.java | 3 +- .../AbstractCommitLogSegmentManager.java | 2 +- .../db/commitlog/AbstractCommitLogService.java | 2 +- .../db/compaction/CompactionManager.java | 3 +- .../apache/cassandra/db/marshal/AsciiType.java | 3 +- .../hints/EncryptedChecksummedDataInput.java | 3 +- .../cassandra/index/sasi/TermIterator.java | 5 +- .../io/compress/DeflateCompressor.java | 11 +-- .../cassandra/net/OutboundTcpConnection.java | 5 +- .../apache/cassandra/repair/RepairRunnable.java | 6 +- .../scheduler/RoundRobinScheduler.java | 2 +- .../cassandra/security/CipherFactory.java | 3 +- .../cassandra/security/EncryptionUtils.java | 3 +- .../serializers/TimestampSerializer.java | 7 +- .../cassandra/service/StorageService.java | 8 +- .../cassandra/thrift/ThriftSessionManager.java | 4 +- .../cassandra/utils/CoalescingStrategies.java | 25 +++--- .../org/apache/cassandra/utils/FBUtilities.java | 15 ++-- .../org/apache/cassandra/cql3/ViewLongTest.java | 3 +- .../test/microbench/FastThreadExecutor.java | 65 +------------- .../test/microbench/FastThreadLocalBench.java | 92 ++++++++++++++++++++ .../cassandra/cache/CacheProviderTest.java | 3 +- .../cassandra/concurrent/WaitQueueTest.java | 4 +- .../cassandra/db/RecoveryManagerTest.java | 22 +++-- .../apache/cassandra/hints/HintsBufferTest.java | 3 +- .../io/sstable/IndexSummaryManagerTest.java | 3 +- .../io/sstable/SSTableRewriterTest.java | 5 +- .../apache/cassandra/service/RemoveTest.java | 26 +++--- .../apache/cassandra/utils/TopKSamplerTest.java | 5 +- .../apache/cassandra/stress/StressServer.java | 6 +- .../operations/userdefined/TokenRangeQuery.java | 3 +- 33 files changed, 239 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0836475..7c1cd82 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,10 +1,12 @@ 3.11.0 + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034) Merged from 3.0: * Fix handling of partition with partition-level deletion plus live rows in sstabledump (CASSANDRA-13177) * Provide user workaround when system_schema.columns does not contain entries for a table that's in system_schema.tables (CASSANDRA-13180) + 3.10 * Fix secondary index queries regression (CASSANDRA-13013) * Add duration type to the protocol V5 (CASSANDRA-12850) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java index 22193c4..5d89f6c 100644 --- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java +++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java @@ -20,6 +20,8 @@ package org.apache.cassandra.concurrent; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.annotations.VisibleForTesting; + import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.FastThreadLocalThread; @@ -58,9 +60,8 @@ public class NamedThreadFactory implements ThreadFactory public Thread newThread(Runnable runnable) { String name = id + ':' + n.getAndIncrement(); - Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name); + Thread thread = createThread(threadGroup, runnable, name, true); thread.setPriority(priority); - thread.setDaemon(true); if (contextClassLoader != null) thread.setContextClassLoader(contextClassLoader); return thread; @@ -75,11 +76,44 @@ public class NamedThreadFactory implements ThreadFactory { return () -> { - try { + try + { r.run(); - } finally { + } + finally + { FastThreadLocal.removeAll(); } }; } + + private static final AtomicInteger threadCounter = new AtomicInteger(); + + @VisibleForTesting + public static Thread createThread(Runnable runnable) + { + return createThread(null, runnable, "anonymous-" + threadCounter.incrementAndGet()); + } + + public static Thread createThread(Runnable runnable, String name) + { + return createThread(null, runnable, name); + } + + public static Thread createThread(Runnable runnable, String name, boolean daemon) + { + return createThread(null, runnable, name, daemon); + } + + public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name) + { + return createThread(threadGroup, runnable, name, false); + } + + public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name, boolean daemon) + { + Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name); + thread.setDaemon(daemon); + return thread; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java b/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java index 676117d..2e4bb4d 100644 --- a/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java +++ b/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java @@ -36,6 +36,7 @@ import ch.qos.logback.classic.LoggerContext; import ch.qos.logback.classic.spi.TurboFilterList; import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter; import ch.qos.logback.classic.turbo.TurboFilter; +import io.netty.util.concurrent.FastThreadLocal; /** * Custom {@link SecurityManager} and {@link Policy} implementation that only performs access checks @@ -175,7 +176,7 @@ public final class ThreadAwareSecurityManager extends SecurityManager }); } - private static final ThreadLocal initializedThread = new ThreadLocal<>(); + private static final FastThreadLocal initializedThread = new FastThreadLocal<>(); private ThreadAwareSecurityManager() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java index eff35f4..0ab941b 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java @@ -147,7 +147,7 @@ public abstract class AbstractCommitLogSegmentManager }; shutdown = false; - managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR"); + managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR"); managerThread.start(); // for simplicity, ensure the first segment is allocated before continuing http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java index 834aa0d..71100a3 100644 --- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java +++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java @@ -150,7 +150,7 @@ public abstract class AbstractCommitLogService }; shutdown = false; - thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name); + thread = NamedThreadFactory.createThread(runnable, name); thread.start(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index f34b8a9..f16c1de 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.cache.AutoSavingCache; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; @@ -93,7 +94,7 @@ public class CompactionManager implements CompactionManagerMBean // A thread local that tells us if the current thread is owned by the compaction manager. Used // by CounterContext to figure out if it should log a warning for invalid counter shards. - public static final ThreadLocal isCompactionManager = new ThreadLocal() + public static final FastThreadLocal isCompactionManager = new FastThreadLocal() { @Override protected Boolean initialValue() http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/marshal/AsciiType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AsciiType.java b/src/java/org/apache/cassandra/db/marshal/AsciiType.java index 089e388..3cd45de 100644 --- a/src/java/org/apache/cassandra/db/marshal/AsciiType.java +++ b/src/java/org/apache/cassandra/db/marshal/AsciiType.java @@ -23,6 +23,7 @@ import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import java.nio.charset.CharacterCodingException; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.cql3.Constants; import org.apache.cassandra.cql3.Json; @@ -40,7 +41,7 @@ public class AsciiType extends AbstractType AsciiType() {super(ComparisonType.BYTE_ORDER);} // singleton - private final ThreadLocal encoder = new ThreadLocal() + private final FastThreadLocal encoder = new FastThreadLocal() { @Override protected CharsetEncoder initialValue() http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java index b335226..b01161d 100644 --- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java +++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java @@ -23,6 +23,7 @@ import javax.crypto.Cipher; import com.google.common.annotations.VisibleForTesting; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.security.EncryptionUtils; import org.apache.cassandra.hints.CompressedChecksummedDataInput.Position; import org.apache.cassandra.io.FSReadError; @@ -31,7 +32,7 @@ import org.apache.cassandra.io.util.ChannelProxy; public class EncryptedChecksummedDataInput extends ChecksummedDataInput { - private static final ThreadLocal reusableBuffers = new ThreadLocal() + private static final FastThreadLocal reusableBuffers = new FastThreadLocal() { protected ByteBuffer initialValue() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/index/sasi/TermIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/sasi/TermIterator.java b/src/java/org/apache/cassandra/index/sasi/TermIterator.java index 1ddfcb9..03dea18 100644 --- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java +++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java @@ -23,6 +23,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder; @@ -42,7 +43,7 @@ public class TermIterator extends RangeIterator { private static final Logger logger = LoggerFactory.getLogger(TermIterator.class); - private static final ThreadLocal SEARCH_EXECUTOR = new ThreadLocal() + private static final FastThreadLocal SEARCH_EXECUTOR = new FastThreadLocal() { public ExecutorService initialValue() { @@ -59,7 +60,7 @@ public class TermIterator extends RangeIterator public Thread newThread(Runnable task) { - return new Thread(NamedThreadFactory.threadLocalDeallocator(task), currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }}; + return NamedThreadFactory.createThread(task, currentThread + "-SEARCH-" + count.incrementAndGet(), true); } }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java index a2ed65c..8557f5f 100644 --- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java +++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.io.compress; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.schema.CompressionParams; import java.io.IOException; @@ -32,7 +33,7 @@ public class DeflateCompressor implements ICompressor { public static final DeflateCompressor instance = new DeflateCompressor(); - private static final ThreadLocal threadLocalScratchBuffer = new ThreadLocal() + private static final FastThreadLocal threadLocalScratchBuffer = new FastThreadLocal() { @Override protected byte[] initialValue() @@ -46,8 +47,8 @@ public class DeflateCompressor implements ICompressor return threadLocalScratchBuffer.get(); } - private final ThreadLocal deflater; - private final ThreadLocal inflater; + private final FastThreadLocal deflater; + private final FastThreadLocal inflater; public static DeflateCompressor create(Map compressionOptions) { @@ -57,7 +58,7 @@ public class DeflateCompressor implements ICompressor private DeflateCompressor() { - deflater = new ThreadLocal() + deflater = new FastThreadLocal() { @Override protected Deflater initialValue() @@ -65,7 +66,7 @@ public class DeflateCompressor implements ICompressor return new Deflater(); } }; - inflater = new ThreadLocal() + inflater = new FastThreadLocal() { @Override protected Inflater initialValue() http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 1843e7b..0693ac3 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -503,7 +503,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread { final AtomicInteger version = new AtomicInteger(NO_VERSION); final CountDownLatch versionLatch = new CountDownLatch(1); - Runnable r = () -> + NamedThreadFactory.createThread(() -> { try { @@ -523,8 +523,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread //unblock the waiting thread on either success or fail versionLatch.countDown(); } - }; - new Thread(NamedThreadFactory.threadLocalDeallocator(r), "HANDSHAKE-" + poolReference.endPoint()).start(); + }, "HANDSHAKE-" + poolReference.endPoint()).start(); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 013dcec..c9eed54 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -73,6 +73,8 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti private final List listeners = new ArrayList<>(); + private static final AtomicInteger threadCounter = new AtomicInteger(1); + public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace) { this.storageService = storageService; @@ -376,7 +378,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti private Thread createQueryThread(final int cmd, final UUID sessionId) { - return new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable() + return NamedThreadFactory.createThread(new WrappedRunnable() { // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces. // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts. @@ -443,6 +445,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti seen[si].clear(); } } - })); + }, "Repair-Runnable-" + threadCounter.incrementAndGet()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java index 904deb3..fd967f3 100644 --- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java +++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java @@ -67,7 +67,7 @@ public class RoundRobinScheduler implements IRequestScheduler schedule(); } }; - Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER"); + Thread scheduler = NamedThreadFactory.createThread(runnable, "REQUEST-SCHEDULER"); scheduler.start(); logger.info("Started the RoundRobin Request Scheduler"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/security/CipherFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/security/CipherFactory.java b/src/java/org/apache/cassandra/security/CipherFactory.java index 7c1495a..3f5c5f3 100644 --- a/src/java/org/apache/cassandra/security/CipherFactory.java +++ b/src/java/org/apache/cassandra/security/CipherFactory.java @@ -39,6 +39,7 @@ import com.google.common.cache.RemovalNotification; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.config.TransparentDataEncryptionOptions; /** @@ -54,7 +55,7 @@ public class CipherFactory * Bonus points if you can avoid calling (@code Cipher#init); hence, the point of the supporting struct * for caching Cipher instances. */ - private static final ThreadLocal cipherThreadLocal = new ThreadLocal<>(); + private static final FastThreadLocal cipherThreadLocal = new FastThreadLocal<>(); private final SecureRandom secureRandom; private final LoadingCache cache; http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/security/EncryptionUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java index bb61260..855e2a9 100644 --- a/src/java/org/apache/cassandra/security/EncryptionUtils.java +++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java @@ -28,6 +28,7 @@ import javax.crypto.ShortBufferException; import com.google.common.base.Preconditions; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.db.commitlog.EncryptedSegment; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ChannelProxy; @@ -45,7 +46,7 @@ public class EncryptionUtils public static final int COMPRESSED_BLOCK_HEADER_SIZE = 4; public static final int ENCRYPTED_BLOCK_HEADER_SIZE = 8; - private static final ThreadLocal reusableBuffers = new ThreadLocal() + private static final FastThreadLocal reusableBuffers = new FastThreadLocal() { protected ByteBuffer initialValue() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/serializers/TimestampSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java index a4a6f80..ac75d4b 100644 --- a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java +++ b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.serializers; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.utils.ByteBufferUtil; import java.nio.ByteBuffer; @@ -90,7 +91,7 @@ public class TimestampSerializer implements TypeSerializer private static final String DEFAULT_FORMAT = dateStringPatterns[6]; private static final Pattern timestampPattern = Pattern.compile("^-?\\d+$"); - private static final ThreadLocal FORMATTER = new ThreadLocal() + private static final FastThreadLocal FORMATTER = new FastThreadLocal() { protected SimpleDateFormat initialValue() { @@ -99,7 +100,7 @@ public class TimestampSerializer implements TypeSerializer }; private static final String UTC_FORMAT = dateStringPatterns[40]; - private static final ThreadLocal FORMATTER_UTC = new ThreadLocal() + private static final FastThreadLocal FORMATTER_UTC = new FastThreadLocal() { protected SimpleDateFormat initialValue() { @@ -110,7 +111,7 @@ public class TimestampSerializer implements TypeSerializer }; private static final String TO_JSON_FORMAT = dateStringPatterns[19]; - private static final ThreadLocal FORMATTER_TO_JSON = new ThreadLocal() + private static final FastThreadLocal FORMATTER_TO_JSON = new FastThreadLocal() { protected SimpleDateFormat initialValue() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 720269f..b64cf13 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -126,6 +126,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE @Deprecated private final LegacyJMXProgressSupport legacyProgressSupport; + private static final AtomicInteger threadCounter = new AtomicInteger(1); + private static int getRingDelay() { String newdelay = System.getProperty("cassandra.ring_delay_ms"); @@ -634,7 +636,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } // daemon threads, like our executors', continue to run while shutdown hooks are invoked - drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable() + drainOnShutdown = NamedThreadFactory.createThread(new WrappedRunnable() { @Override public void runMayThrow() throws InterruptedException, ExecutionException, IOException @@ -649,7 +651,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory()); logbackHook.run(); } - }), "StorageServiceShutdownHook"); + }, "StorageServiceShutdownHook"); Runtime.getRuntime().addShutdownHook(drainOnShutdown); replacing = isReplacing(); @@ -3552,7 +3554,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return 0; int cmd = nextRepairCommand.incrementAndGet(); - new Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, keyspace, options, legacy))).start(); + NamedThreadFactory.createThread(createRepairTask(cmd, keyspace, options, legacy), "Repair-Task-" + threadCounter.incrementAndGet()).start(); return cmd; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java index 7d22507..60da3b4 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java +++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java @@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocal; + /** * Encapsulates the current client state (session). * @@ -35,7 +37,7 @@ public class ThriftSessionManager private static final Logger logger = LoggerFactory.getLogger(ThriftSessionManager.class); public final static ThriftSessionManager instance = new ThriftSessionManager(); - private final ThreadLocal remoteSocket = new ThreadLocal<>(); + private final FastThreadLocal remoteSocket = new FastThreadLocal<>(); private final ConcurrentHashMap activeSocketSessions = new ConcurrentHashMap<>(); /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/utils/CoalescingStrategies.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java index 16a64df..0aa980f 100644 --- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java +++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.utils; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Config; import org.apache.cassandra.io.util.FileUtils; import org.slf4j.Logger; @@ -127,25 +128,21 @@ public class CoalescingStrategies this.displayName = displayName; if (DEBUG_COALESCING) { - new Thread(displayName + " debug thread") + NamedThreadFactory.createThread(() -> { - @Override - public void run() + while (true) { - while (true) + try { - try - { - Thread.sleep(5000); - } - catch (InterruptedException e) - { - throw new AssertionError(); - } - shouldLogAverage = true; + Thread.sleep(5000); } + catch (InterruptedException e) + { + throw new AssertionError(); + } + shouldLogAverage = true; } - }.start(); + }, displayName + " debug thread").start(); } RandomAccessFile rasTemp = null; ByteBuffer logBufferTemp = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index a925c0e..2138ea5 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.auth.IAuthorizer; import org.apache.cassandra.auth.IRoleManager; @@ -87,28 +88,22 @@ public class FBUtilities return Runtime.getRuntime().availableProcessors(); } - private static final ThreadLocal localMD5Digest = new ThreadLocal() + private static final FastThreadLocal localMD5Digest = new FastThreadLocal() { @Override protected MessageDigest initialValue() { return newMessageDigest("MD5"); } - - @Override - public MessageDigest get() - { - MessageDigest digest = super.get(); - digest.reset(); - return digest; - } }; public static final int MAX_UNSIGNED_SHORT = 0xFFFF; public static MessageDigest threadLocalMD5Digest() { - return localMD5Digest.get(); + MessageDigest md = localMD5Digest.get(); + md.reset(); + return md; } public static MessageDigest newMessageDigest(String algorithm) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/long/org/apache/cassandra/cql3/ViewLongTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/cql3/ViewLongTest.java b/test/long/org/apache/cassandra/cql3/ViewLongTest.java index a5d17ea..590f148 100644 --- a/test/long/org/apache/cassandra/cql3/ViewLongTest.java +++ b/test/long/org/apache/cassandra/cql3/ViewLongTest.java @@ -33,6 +33,7 @@ import com.datastax.driver.core.Row; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.core.exceptions.WriteTimeoutException; import org.apache.cassandra.batchlog.BatchlogManager; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.WrappedRunnable; @@ -90,7 +91,7 @@ public class ViewLongTest extends CQLTester for (int i = 0; i < writers; i++) { final int writer = i; - Thread t = new Thread(new WrappedRunnable() + Thread t = NamedThreadFactory.createThread(new WrappedRunnable() { public void runMayThrow() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java index d0b4442..5644e4f 100644 --- a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java +++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java @@ -18,14 +18,11 @@ package org.apache.cassandra.test.microbench; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import io.netty.util.concurrent.FastThreadLocalThread; +import io.netty.util.concurrent.DefaultThreadFactory; /** * Created to test perf of FastThreadLocal @@ -33,64 +30,10 @@ import io.netty.util.concurrent.FastThreadLocalThread; * Used in MutationBench via: * jvmArgsAppend = {"-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"} */ -public class FastThreadExecutor extends AbstractExecutorService +public class FastThreadExecutor extends ThreadPoolExecutor { - final FastThreadLocalThread thread; - final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); - final CountDownLatch shutdown = new CountDownLatch(1); - public FastThreadExecutor(int size, String name) { - assert size == 1; - - thread = new FastThreadLocalThread(() -> { - Runnable work = null; - try - { - while ((work = queue.take()) != null) - work.run(); - } - catch (InterruptedException e) - { - shutdown.countDown(); - } - }); - - thread.setName(name + "-1"); - thread.setDaemon(true); - - thread.start(); - } - - - public void shutdown() - { - thread.interrupt(); - } - - public List shutdownNow() - { - thread.interrupt(); - return Collections.emptyList(); - } - - public boolean isShutdown() - { - return shutdown.getCount() == 0; - } - - public boolean isTerminated() - { - return shutdown.getCount() == 0; - } - - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException - { - return shutdown.await(timeout, unit); - } - - public void execute(Runnable command) - { - while(!queue.add(command)); + super(size, size, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new DefaultThreadFactory(name, true)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java ---------------------------------------------------------------------- diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java new file mode 100644 index 0000000..491dc44 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.util.concurrent.TimeUnit; + +import io.netty.util.concurrent.FastThreadLocal; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1,jvmArgsAppend = {"-Xmx512M", "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}) +@Threads(4) // make sure this matches the number of _physical_cores_ +@State(Scope.Benchmark) +public class FastThreadLocalBench +{ + @Param({"2", "4", "8", "12"}) + private int variables = 2; + + static final int max = 20; + static final ThreadLocal[] threadLocals = new ThreadLocal[max]; + static final FastThreadLocal[] fastThreadLocals = new FastThreadLocal[max]; + static + { + for (int i = 0; i < max; i++) + { + threadLocals[i] = ThreadLocal.withInitial(Object::new); + fastThreadLocals[i] = new FastThreadLocal() { + protected Object initialValue() throws Exception + { + return new Object(); + } + }; + } + } + + @State(Scope.Thread) + public static class FastThreadLocalBenchState + { + public int index; + } + + @Benchmark + public void baseline(FastThreadLocalBenchState state, Blackhole bh) + { + if (variables != 2) + throw new IllegalArgumentException("skipped"); + + bh.consume("foo"); + } + + @Benchmark + public void threadLocal(FastThreadLocalBenchState state, Blackhole bh) + { + bh.consume(threadLocals[state.index % max].get()); + } + + @Benchmark + public void fastThreadLocal(FastThreadLocalBenchState state, Blackhole bh) + { + bh.consume(fastThreadLocals[state.index % max].get()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index a4173d6..eca124f 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -32,6 +32,7 @@ import java.util.List; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.Pair; @@ -135,7 +136,7 @@ public class CacheProviderTest List threads = new ArrayList<>(100); for (int i = 0; i < 100; i++) { - Thread thread = new Thread(runnable); + Thread thread = NamedThreadFactory.createThread(runnable); threads.add(thread); thread.start(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java index fdc6880..ac2a9c0 100644 --- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java +++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java @@ -44,7 +44,7 @@ public class WaitQueueTest final AtomicInteger ready = new AtomicInteger(); Thread[] ts = new Thread[4]; for (int i = 0 ; i < ts.length ; i++) - ts[i] = new Thread(new Runnable() + ts[i] = NamedThreadFactory.createThread(new Runnable() { @Override public void run() @@ -84,7 +84,7 @@ public class WaitQueueTest final AtomicBoolean ready = new AtomicBoolean(false); final AtomicBoolean condition = new AtomicBoolean(false); final AtomicBoolean fail = new AtomicBoolean(false); - Thread t = new Thread(new Runnable() + Thread t = NamedThreadFactory.createThread(new Runnable() { @Override public void run() http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java index cbc412d..f5bda4f 100644 --- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java +++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java @@ -34,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.Util; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; @@ -156,20 +157,17 @@ public class RecoveryManagerTest Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty()); final AtomicReference err = new AtomicReference(); - Thread t = new Thread() { - @Override - public void run() + Thread t = NamedThreadFactory.createThread(() -> + { + try { - try - { - CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL - } - catch (Throwable t) - { - err.set(t); - } + CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL } - }; + catch (Throwable x) + { + err.set(x); + } + }); t.start(); Assert.assertTrue(mockInitiator.blocked.tryAcquire(1, 20, TimeUnit.SECONDS)); Thread.sleep(100); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/hints/HintsBufferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java index 78ea4f4..08f7ec0 100644 --- a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java +++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java @@ -28,6 +28,7 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Mutation; @@ -114,7 +115,7 @@ public class HintsBufferTest // create HINT_THREADS_COUNT, start them, and wait for them to finish List threads = new ArrayList<>(HINT_THREADS_COUNT); for (int i = 0; i < HINT_THREADS_COUNT; i ++) - threads.add(new Thread(new Writer(buffer, load, hintSize, i, baseTimestamp))); + threads.add(NamedThreadFactory.createThread(new Writer(buffer, load, hintSize, i, baseTimestamp))); threads.forEach(java.lang.Thread::start); for (Thread thread : threads) thread.join(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java index 9737281..f287912 100644 --- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.RowUpdateBuilder; @@ -619,7 +620,7 @@ public class IndexSummaryManagerTest // barrier to control when redistribution runs final CountDownLatch barrier = new CountDownLatch(1); - Thread t = new Thread(new Runnable() + Thread t = NamedThreadFactory.createThread(new Runnable() { public void run() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 942ebe9..d1b4092 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -33,6 +33,7 @@ import org.junit.Test; import org.apache.cassandra.Util; import org.apache.cassandra.UpdateBuilder; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; @@ -813,7 +814,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } } }; - Thread t = new Thread(r); + Thread t = NamedThreadFactory.createThread(r); try { t.start(); @@ -895,7 +896,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase } } }; - Thread t = new Thread(r); + Thread t = NamedThreadFactory.createThread(r); try { t.start(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index 0ef9b9c..701ea0f 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -32,6 +32,7 @@ import org.junit.*; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.RandomPartitioner; @@ -141,23 +142,20 @@ public class RemoveTest { // start removal in background and send replication confirmations final AtomicBoolean success = new AtomicBoolean(false); - Thread remover = new Thread() + Thread remover = NamedThreadFactory.createThread(() -> { - public void run() + try { - try - { - ss.removeNode(removalId.toString()); - } - catch (Exception e) - { - System.err.println(e); - e.printStackTrace(); - return; - } - success.set(true); + ss.removeNode(removalId.toString()); } - }; + catch (Exception e) + { + System.err.println(e); + e.printStackTrace(); + return; + } + success.set(true); + }); remover.start(); Thread.sleep(1000); // make sure removal is waiting for confirmation http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java index bb6e3a8..42aef0c 100644 --- a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java +++ b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import com.clearspring.analytics.hash.MurmurHash; import com.clearspring.analytics.stream.Counter; import junit.framework.Assert; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.utils.TopKSampler.SamplerResult; public class TopKSamplerTest @@ -82,7 +83,7 @@ public class TopKSamplerTest final CountDownLatch latch = new CountDownLatch(1); final TopKSampler sampler = new TopKSampler(); - new Thread(new Runnable() + NamedThreadFactory.createThread(new Runnable() { public void run() { @@ -99,7 +100,7 @@ public class TopKSamplerTest } } - ,"inserter").start(); + , "inserter").start(); try { // start/stop in fast iterations http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/tools/stress/src/org/apache/cassandra/stress/StressServer.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/StressServer.java b/tools/stress/src/org/apache/cassandra/stress/StressServer.java index 793f8f0..c00fb54 100644 --- a/tools/stress/src/org/apache/cassandra/stress/StressServer.java +++ b/tools/stress/src/org/apache/cassandra/stress/StressServer.java @@ -23,9 +23,11 @@ import java.io.PrintStream; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli.*; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.stress.settings.StressSettings; import org.apache.cassandra.stress.util.MultiResultLogger; import org.apache.cassandra.stress.util.ResultLogger; @@ -39,6 +41,8 @@ public class StressServer availableOptions.addOption("h", "host", true, "Host to listen for connections."); } + private static final AtomicInteger threadCounter = new AtomicInteger(1); + public static void main(String[] args) throws Exception { ServerSocket serverSocket = null; @@ -93,7 +97,7 @@ public class StressServer ResultLogger log = new MultiResultLogger(out); StressAction action = new StressAction((StressSettings) in.readObject(), log); - Thread actionThread = new Thread(action); + Thread actionThread = NamedThreadFactory.createThread(action, "stress-" + threadCounter.incrementAndGet()); actionThread.start(); while (actionThread.isAlive()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java ---------------------------------------------------------------------- diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java index f561f61..ff8b27f 100644 --- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java +++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java @@ -35,6 +35,7 @@ import com.datastax.driver.core.Statement; import com.datastax.driver.core.TableMetadata; import com.datastax.driver.core.Token; import com.datastax.driver.core.TokenRange; +import io.netty.util.concurrent.FastThreadLocal; import org.apache.cassandra.stress.Operation; import org.apache.cassandra.stress.StressYaml; import org.apache.cassandra.stress.WorkManager; @@ -46,7 +47,7 @@ import org.apache.cassandra.stress.util.ThriftClient; public class TokenRangeQuery extends Operation { - private final ThreadLocal currentState = new ThreadLocal<>(); + private final FastThreadLocal currentState = new FastThreadLocal<>(); private final TableMetadata tableMetadata; private final TokenRangeIterator tokenRangeIterator;