cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/7] cassandra git commit: Move to FastThreadLocalThread and FastThreadLocal
Date Sun, 05 Feb 2017 17:02:02 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 f71e7e1fe -> 8fc72a5eb


Move to FastThreadLocalThread and FastThreadLocal

patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-13034


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cecbe17e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cecbe17e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cecbe17e

Branch: refs/heads/cassandra-3.11
Commit: cecbe17e3eafc052acc13950494f7dddf026aa54
Parents: f71e7e1
Author: Robert Stupp <snazy@snazy.de>
Authored: Tue Dec 13 17:37:09 2016 +0100
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Sun Feb 5 16:54:19 2017 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../concurrent/NamedThreadFactory.java          | 42 ++++++++-
 .../functions/ThreadAwareSecurityManager.java   |  3 +-
 .../AbstractCommitLogSegmentManager.java        |  2 +-
 .../db/commitlog/AbstractCommitLogService.java  |  2 +-
 .../db/compaction/CompactionManager.java        |  3 +-
 .../apache/cassandra/db/marshal/AsciiType.java  |  3 +-
 .../hints/EncryptedChecksummedDataInput.java    |  3 +-
 .../cassandra/index/sasi/TermIterator.java      |  5 +-
 .../io/compress/DeflateCompressor.java          | 11 +--
 .../cassandra/net/OutboundTcpConnection.java    |  5 +-
 .../apache/cassandra/repair/RepairRunnable.java |  6 +-
 .../scheduler/RoundRobinScheduler.java          |  2 +-
 .../cassandra/security/CipherFactory.java       |  3 +-
 .../cassandra/security/EncryptionUtils.java     |  3 +-
 .../serializers/TimestampSerializer.java        |  7 +-
 .../cassandra/service/StorageService.java       |  8 +-
 .../cassandra/thrift/ThriftSessionManager.java  |  4 +-
 .../cassandra/utils/CoalescingStrategies.java   | 25 +++---
 .../org/apache/cassandra/utils/FBUtilities.java | 15 ++--
 .../org/apache/cassandra/cql3/ViewLongTest.java |  3 +-
 .../test/microbench/FastThreadExecutor.java     | 65 +-------------
 .../test/microbench/FastThreadLocalBench.java   | 92 ++++++++++++++++++++
 .../cassandra/cache/CacheProviderTest.java      |  3 +-
 .../cassandra/concurrent/WaitQueueTest.java     |  4 +-
 .../cassandra/db/RecoveryManagerTest.java       | 22 +++--
 .../apache/cassandra/hints/HintsBufferTest.java |  3 +-
 .../io/sstable/IndexSummaryManagerTest.java     |  3 +-
 .../io/sstable/SSTableRewriterTest.java         |  5 +-
 .../apache/cassandra/service/RemoveTest.java    | 26 +++---
 .../apache/cassandra/utils/TopKSamplerTest.java |  5 +-
 .../apache/cassandra/stress/StressServer.java   |  6 +-
 .../operations/userdefined/TokenRangeQuery.java |  3 +-
 33 files changed, 239 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0836475..7c1cd82 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,10 +1,12 @@
 3.11.0
+ * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
 Merged from 3.0:
  * Fix handling of partition with partition-level deletion plus
    live rows in sstabledump (CASSANDRA-13177)
  * Provide user workaround when system_schema.columns does not contain entries
    for a table that's in system_schema.tables (CASSANDRA-13180)
 
+
 3.10
  * Fix secondary index queries regression (CASSANDRA-13013)
  * Add duration type to the protocol V5 (CASSANDRA-12850)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 22193c4..5d89f6c 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.concurrent;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.netty.util.concurrent.FastThreadLocal;
 import io.netty.util.concurrent.FastThreadLocalThread;
 
@@ -58,9 +60,8 @@ public class NamedThreadFactory implements ThreadFactory
     public Thread newThread(Runnable runnable)
     {
         String name = id + ':' + n.getAndIncrement();
-        Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
+        Thread thread = createThread(threadGroup, runnable, name, true);
         thread.setPriority(priority);
-        thread.setDaemon(true);
         if (contextClassLoader != null)
             thread.setContextClassLoader(contextClassLoader);
         return thread;
@@ -75,11 +76,44 @@ public class NamedThreadFactory implements ThreadFactory
     {
         return () ->
         {
-            try {
+            try
+            {
                 r.run();
-            } finally {
+            }
+            finally
+            {
                 FastThreadLocal.removeAll();
             }
         };
     }
+
+    private static final AtomicInteger threadCounter = new AtomicInteger();
+
+    @VisibleForTesting
+    public static Thread createThread(Runnable runnable)
+    {
+        return createThread(null, runnable, "anonymous-" + threadCounter.incrementAndGet());
+    }
+
+    public static Thread createThread(Runnable runnable, String name)
+    {
+        return createThread(null, runnable, name);
+    }
+
+    public static Thread createThread(Runnable runnable, String name, boolean daemon)
+    {
+        return createThread(null, runnable, name, daemon);
+    }
+
+    public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name)
+    {
+        return createThread(threadGroup, runnable, name, false);
+    }
+
+    public static Thread createThread(ThreadGroup threadGroup, Runnable runnable, String name, boolean daemon)
+    {
+        Thread thread = new FastThreadLocalThread(threadGroup, threadLocalDeallocator(runnable), name);
+        thread.setDaemon(daemon);
+        return thread;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java b/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
index 676117d..2e4bb4d 100644
--- a/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
+++ b/src/java/org/apache/cassandra/cql3/functions/ThreadAwareSecurityManager.java
@@ -36,6 +36,7 @@ import ch.qos.logback.classic.LoggerContext;
 import ch.qos.logback.classic.spi.TurboFilterList;
 import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter;
 import ch.qos.logback.classic.turbo.TurboFilter;
+import io.netty.util.concurrent.FastThreadLocal;
 
 /**
  * Custom {@link SecurityManager} and {@link Policy} implementation that only performs access checks
@@ -175,7 +176,7 @@ public final class ThreadAwareSecurityManager extends SecurityManager
         });
     }
 
-    private static final ThreadLocal<Boolean> initializedThread = new ThreadLocal<>();
+    private static final FastThreadLocal<Boolean> initializedThread = new FastThreadLocal<>();
 
     private ThreadAwareSecurityManager()
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
index eff35f4..0ab941b 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogSegmentManager.java
@@ -147,7 +147,7 @@ public abstract class AbstractCommitLogSegmentManager
         };
 
         shutdown = false;
-        managerThread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "COMMIT-LOG-ALLOCATOR");
+        managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR");
         managerThread.start();
 
         // for simplicity, ensure the first segment is allocated before continuing

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
index 834aa0d..71100a3 100644
--- a/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
+++ b/src/java/org/apache/cassandra/db/commitlog/AbstractCommitLogService.java
@@ -150,7 +150,7 @@ public abstract class AbstractCommitLogService
         };
 
         shutdown = false;
-        thread = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), name);
+        thread = NamedThreadFactory.createThread(runnable, name);
         thread.start();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f34b8a9..f16c1de 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -36,6 +36,7 @@ import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.cache.AutoSavingCache;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
@@ -93,7 +94,7 @@ public class CompactionManager implements CompactionManagerMBean
 
     // A thread local that tells us if the current thread is owned by the compaction manager. Used
     // by CounterContext to figure out if it should log a warning for invalid counter shards.
-    public static final ThreadLocal<Boolean> isCompactionManager = new ThreadLocal<Boolean>()
+    public static final FastThreadLocal<Boolean> isCompactionManager = new FastThreadLocal<Boolean>()
     {
         @Override
         protected Boolean initialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/db/marshal/AsciiType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/AsciiType.java b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
index 089e388..3cd45de 100644
--- a/src/java/org/apache/cassandra/db/marshal/AsciiType.java
+++ b/src/java/org/apache/cassandra/db/marshal/AsciiType.java
@@ -23,6 +23,7 @@ import java.nio.charset.Charset;
 import java.nio.charset.CharsetEncoder;
 import java.nio.charset.CharacterCodingException;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.cql3.Constants;
 import org.apache.cassandra.cql3.Json;
 
@@ -40,7 +41,7 @@ public class AsciiType extends AbstractType<String>
 
     AsciiType() {super(ComparisonType.BYTE_ORDER);} // singleton
 
-    private final ThreadLocal<CharsetEncoder> encoder = new ThreadLocal<CharsetEncoder>()
+    private final FastThreadLocal<CharsetEncoder> encoder = new FastThreadLocal<CharsetEncoder>()
     {
         @Override
         protected CharsetEncoder initialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
index b335226..b01161d 100644
--- a/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
+++ b/src/java/org/apache/cassandra/hints/EncryptedChecksummedDataInput.java
@@ -23,6 +23,7 @@ import javax.crypto.Cipher;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.security.EncryptionUtils;
 import org.apache.cassandra.hints.CompressedChecksummedDataInput.Position;
 import org.apache.cassandra.io.FSReadError;
@@ -31,7 +32,7 @@ import org.apache.cassandra.io.util.ChannelProxy;
 
 public class EncryptedChecksummedDataInput extends ChecksummedDataInput
 {
-    private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>()
+    private static final FastThreadLocal<ByteBuffer> reusableBuffers = new FastThreadLocal<ByteBuffer>()
     {
         protected ByteBuffer initialValue()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/index/sasi/TermIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/sasi/TermIterator.java b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
index 1ddfcb9..03dea18 100644
--- a/src/java/org/apache/cassandra/index/sasi/TermIterator.java
+++ b/src/java/org/apache/cassandra/index/sasi/TermIterator.java
@@ -23,6 +23,7 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.index.sasi.disk.OnDiskIndexBuilder;
@@ -42,7 +43,7 @@ public class TermIterator extends RangeIterator<Long, Token>
 {
     private static final Logger logger = LoggerFactory.getLogger(TermIterator.class);
 
-    private static final ThreadLocal<ExecutorService> SEARCH_EXECUTOR = new ThreadLocal<ExecutorService>()
+    private static final FastThreadLocal<ExecutorService> SEARCH_EXECUTOR = new FastThreadLocal<ExecutorService>()
     {
         public ExecutorService initialValue()
         {
@@ -59,7 +60,7 @@ public class TermIterator extends RangeIterator<Long, Token>
 
                 public Thread newThread(Runnable task)
                 {
-                    return new Thread(NamedThreadFactory.threadLocalDeallocator(task), currentThread + "-SEARCH-" + count.incrementAndGet()) {{ setDaemon(true); }};
+                    return NamedThreadFactory.createThread(task, currentThread + "-SEARCH-" + count.incrementAndGet(), true);
                 }
             });
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
index a2ed65c..8557f5f 100644
--- a/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/DeflateCompressor.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.compress;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.schema.CompressionParams;
 
 import java.io.IOException;
@@ -32,7 +33,7 @@ public class DeflateCompressor implements ICompressor
 {
     public static final DeflateCompressor instance = new DeflateCompressor();
 
-    private static final ThreadLocal<byte[]> threadLocalScratchBuffer = new ThreadLocal<byte[]>()
+    private static final FastThreadLocal<byte[]> threadLocalScratchBuffer = new FastThreadLocal<byte[]>()
     {
         @Override
         protected byte[] initialValue()
@@ -46,8 +47,8 @@ public class DeflateCompressor implements ICompressor
         return threadLocalScratchBuffer.get();
     }
 
-    private final ThreadLocal<Deflater> deflater;
-    private final ThreadLocal<Inflater> inflater;
+    private final FastThreadLocal<Deflater> deflater;
+    private final FastThreadLocal<Inflater> inflater;
 
     public static DeflateCompressor create(Map<String, String> compressionOptions)
     {
@@ -57,7 +58,7 @@ public class DeflateCompressor implements ICompressor
 
     private DeflateCompressor()
     {
-        deflater = new ThreadLocal<Deflater>()
+        deflater = new FastThreadLocal<Deflater>()
         {
             @Override
             protected Deflater initialValue()
@@ -65,7 +66,7 @@ public class DeflateCompressor implements ICompressor
                 return new Deflater();
             }
         };
-        inflater = new ThreadLocal<Inflater>()
+        inflater = new FastThreadLocal<Inflater>()
         {
             @Override
             protected Inflater initialValue()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1843e7b..0693ac3 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -503,7 +503,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
     {
         final AtomicInteger version = new AtomicInteger(NO_VERSION);
         final CountDownLatch versionLatch = new CountDownLatch(1);
-        Runnable r = () ->
+        NamedThreadFactory.createThread(() ->
         {
             try
             {
@@ -523,8 +523,7 @@ public class OutboundTcpConnection extends FastThreadLocalThread
                 //unblock the waiting thread on either success or fail
                 versionLatch.countDown();
             }
-        };
-        new Thread(NamedThreadFactory.threadLocalDeallocator(r), "HANDSHAKE-" + poolReference.endPoint()).start();
+        }, "HANDSHAKE-" + poolReference.endPoint()).start();
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 013dcec..c9eed54 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -73,6 +73,8 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
 
     private final List<ProgressListener> listeners = new ArrayList<>();
 
+    private static final AtomicInteger threadCounter = new AtomicInteger(1);
+
     public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace)
     {
         this.storageService = storageService;
@@ -376,7 +378,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
 
     private Thread createQueryThread(final int cmd, final UUID sessionId)
     {
-        return new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
+        return NamedThreadFactory.createThread(new WrappedRunnable()
         {
             // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
             // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
@@ -443,6 +445,6 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                     seen[si].clear();
                 }
             }
-        }));
+        }, "Repair-Runnable-" + threadCounter.incrementAndGet());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
index 904deb3..fd967f3 100644
--- a/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
+++ b/src/java/org/apache/cassandra/scheduler/RoundRobinScheduler.java
@@ -67,7 +67,7 @@ public class RoundRobinScheduler implements IRequestScheduler
                 schedule();
             }
         };
-        Thread scheduler = new Thread(NamedThreadFactory.threadLocalDeallocator(runnable), "REQUEST-SCHEDULER");
+        Thread scheduler = NamedThreadFactory.createThread(runnable, "REQUEST-SCHEDULER");
         scheduler.start();
         logger.info("Started the RoundRobin Request Scheduler");
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/security/CipherFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/CipherFactory.java b/src/java/org/apache/cassandra/security/CipherFactory.java
index 7c1495a..3f5c5f3 100644
--- a/src/java/org/apache/cassandra/security/CipherFactory.java
+++ b/src/java/org/apache/cassandra/security/CipherFactory.java
@@ -39,6 +39,7 @@ import com.google.common.cache.RemovalNotification;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.config.TransparentDataEncryptionOptions;
 
 /**
@@ -54,7 +55,7 @@ public class CipherFactory
      * Bonus points if you can avoid calling (@code Cipher#init); hence, the point of the supporting struct
      * for caching Cipher instances.
      */
-    private static final ThreadLocal<CachedCipher> cipherThreadLocal = new ThreadLocal<>();
+    private static final FastThreadLocal<CachedCipher> cipherThreadLocal = new FastThreadLocal<>();
 
     private final SecureRandom secureRandom;
     private final LoadingCache<String, Key> cache;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/security/EncryptionUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/EncryptionUtils.java b/src/java/org/apache/cassandra/security/EncryptionUtils.java
index bb61260..855e2a9 100644
--- a/src/java/org/apache/cassandra/security/EncryptionUtils.java
+++ b/src/java/org/apache/cassandra/security/EncryptionUtils.java
@@ -28,6 +28,7 @@ import javax.crypto.ShortBufferException;
 
 import com.google.common.base.Preconditions;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.db.commitlog.EncryptedSegment;
 import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
@@ -45,7 +46,7 @@ public class EncryptionUtils
     public static final int COMPRESSED_BLOCK_HEADER_SIZE = 4;
     public static final int ENCRYPTED_BLOCK_HEADER_SIZE = 8;
 
-    private static final ThreadLocal<ByteBuffer> reusableBuffers = new ThreadLocal<ByteBuffer>()
+    private static final FastThreadLocal<ByteBuffer> reusableBuffers = new FastThreadLocal<ByteBuffer>()
     {
         protected ByteBuffer initialValue()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
index a4a6f80..ac75d4b 100644
--- a/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
+++ b/src/java/org/apache/cassandra/serializers/TimestampSerializer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.serializers;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import java.nio.ByteBuffer;
@@ -90,7 +91,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
     private static final String DEFAULT_FORMAT = dateStringPatterns[6];
     private static final Pattern timestampPattern = Pattern.compile("^-?\\d+$");
 
-    private static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>()
+    private static final FastThreadLocal<SimpleDateFormat> FORMATTER = new FastThreadLocal<SimpleDateFormat>()
     {
         protected SimpleDateFormat initialValue()
         {
@@ -99,7 +100,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
     };
 
     private static final String UTC_FORMAT = dateStringPatterns[40];
-    private static final ThreadLocal<SimpleDateFormat> FORMATTER_UTC = new ThreadLocal<SimpleDateFormat>()
+    private static final FastThreadLocal<SimpleDateFormat> FORMATTER_UTC = new FastThreadLocal<SimpleDateFormat>()
     {
         protected SimpleDateFormat initialValue()
         {
@@ -110,7 +111,7 @@ public class TimestampSerializer implements TypeSerializer<Date>
     };
 
     private static final String TO_JSON_FORMAT = dateStringPatterns[19];
-    private static final ThreadLocal<SimpleDateFormat> FORMATTER_TO_JSON = new ThreadLocal<SimpleDateFormat>()
+    private static final FastThreadLocal<SimpleDateFormat> FORMATTER_TO_JSON = new FastThreadLocal<SimpleDateFormat>()
     {
         protected SimpleDateFormat initialValue()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 720269f..b64cf13 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -126,6 +126,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     @Deprecated
     private final LegacyJMXProgressSupport legacyProgressSupport;
 
+    private static final AtomicInteger threadCounter = new AtomicInteger(1);
+
     private static int getRingDelay()
     {
         String newdelay = System.getProperty("cassandra.ring_delay_ms");
@@ -634,7 +636,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         }
 
         // daemon threads, like our executors', continue to run while shutdown hooks are invoked
-        drainOnShutdown = new Thread(NamedThreadFactory.threadLocalDeallocator(new WrappedRunnable()
+        drainOnShutdown = NamedThreadFactory.createThread(new WrappedRunnable()
         {
             @Override
             public void runMayThrow() throws InterruptedException, ExecutionException, IOException
@@ -649,7 +651,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 logbackHook.setContext((LoggerContext)LoggerFactory.getILoggerFactory());
                 logbackHook.run();
             }
-        }), "StorageServiceShutdownHook");
+        }, "StorageServiceShutdownHook");
         Runtime.getRuntime().addShutdownHook(drainOnShutdown);
 
         replacing = isReplacing();
@@ -3552,7 +3554,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             return 0;
 
         int cmd = nextRepairCommand.incrementAndGet();
-        new Thread(NamedThreadFactory.threadLocalDeallocator(createRepairTask(cmd, keyspace, options, legacy))).start();
+        NamedThreadFactory.createThread(createRepairTask(cmd, keyspace, options, legacy), "Repair-Task-" + threadCounter.incrementAndGet()).start();
         return cmd;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
index 7d22507..60da3b4 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftSessionManager.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
+
 /**
  * Encapsulates the current client state (session).
  *
@@ -35,7 +37,7 @@ public class ThriftSessionManager
     private static final Logger logger = LoggerFactory.getLogger(ThriftSessionManager.class);
     public final static ThriftSessionManager instance = new ThriftSessionManager();
 
-    private final ThreadLocal<SocketAddress> remoteSocket = new ThreadLocal<>();
+    private final FastThreadLocal<SocketAddress> remoteSocket = new FastThreadLocal<>();
     private final ConcurrentHashMap<SocketAddress, ThriftClientState> activeSocketSessions = new ConcurrentHashMap<>();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
index 16a64df..0aa980f 100644
--- a/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
+++ b/src/java/org/apache/cassandra/utils/CoalescingStrategies.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.utils;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.io.util.FileUtils;
 import org.slf4j.Logger;
@@ -127,25 +128,21 @@ public class CoalescingStrategies
             this.displayName = displayName;
             if (DEBUG_COALESCING)
             {
-                new Thread(displayName + " debug thread")
+                NamedThreadFactory.createThread(() ->
                 {
-                    @Override
-                    public void run()
+                    while (true)
                     {
-                        while (true)
+                        try
                         {
-                            try
-                            {
-                                Thread.sleep(5000);
-                            }
-                            catch (InterruptedException e)
-                            {
-                                throw new AssertionError();
-                            }
-                            shouldLogAverage = true;
+                            Thread.sleep(5000);
                         }
+                        catch (InterruptedException e)
+                        {
+                            throw new AssertionError();
+                        }
+                        shouldLogAverage = true;
                     }
-                }.start();
+                }, displayName + " debug thread").start();
             }
             RandomAccessFile rasTemp = null;
             ByteBuffer logBufferTemp = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java
index a925c0e..2138ea5 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.IAuthorizer;
 import org.apache.cassandra.auth.IRoleManager;
@@ -87,28 +88,22 @@ public class FBUtilities
             return Runtime.getRuntime().availableProcessors();
     }
 
-    private static final ThreadLocal<MessageDigest> localMD5Digest = new ThreadLocal<MessageDigest>()
+    private static final FastThreadLocal<MessageDigest> localMD5Digest = new FastThreadLocal<MessageDigest>()
     {
         @Override
         protected MessageDigest initialValue()
         {
             return newMessageDigest("MD5");
         }
-
-        @Override
-        public MessageDigest get()
-        {
-            MessageDigest digest = super.get();
-            digest.reset();
-            return digest;
-        }
     };
 
     public static final int MAX_UNSIGNED_SHORT = 0xFFFF;
 
     public static MessageDigest threadLocalMD5Digest()
     {
-        return localMD5Digest.get();
+        MessageDigest md = localMD5Digest.get();
+        md.reset();
+        return md;
     }
 
     public static MessageDigest newMessageDigest(String algorithm)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/long/org/apache/cassandra/cql3/ViewLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/cql3/ViewLongTest.java b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
index a5d17ea..590f148 100644
--- a/test/long/org/apache/cassandra/cql3/ViewLongTest.java
+++ b/test/long/org/apache/cassandra/cql3/ViewLongTest.java
@@ -33,6 +33,7 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.exceptions.WriteTimeoutException;
 import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.WrappedRunnable;
 
@@ -90,7 +91,7 @@ public class ViewLongTest extends CQLTester
         for (int i = 0; i < writers; i++)
         {
             final int writer = i;
-            Thread t = new Thread(new WrappedRunnable()
+            Thread t = NamedThreadFactory.createThread(new WrappedRunnable()
             {
                 public void runMayThrow()
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
index d0b4442..5644e4f 100644
--- a/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
+++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadExecutor.java
@@ -18,14 +18,11 @@
 
 package org.apache.cassandra.test.microbench;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.AbstractExecutorService;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-import io.netty.util.concurrent.FastThreadLocalThread;
+import io.netty.util.concurrent.DefaultThreadFactory;
 
 /**
  * Created to test perf of FastThreadLocal
@@ -33,64 +30,10 @@ import io.netty.util.concurrent.FastThreadLocalThread;
  * Used in MutationBench via:
  * jvmArgsAppend = {"-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"}
  */
-public class FastThreadExecutor extends AbstractExecutorService
+public class FastThreadExecutor extends ThreadPoolExecutor
 {
-    final FastThreadLocalThread thread;
-    final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
-    final CountDownLatch shutdown = new CountDownLatch(1);
-
     public FastThreadExecutor(int size, String name)
     {
-        assert size == 1;
-
-        thread = new FastThreadLocalThread(() -> {
-            Runnable work = null;
-            try
-            {
-                while ((work = queue.take()) != null)
-                    work.run();
-            }
-            catch (InterruptedException e)
-            {
-                shutdown.countDown();
-            }
-        });
-
-        thread.setName(name + "-1");
-        thread.setDaemon(true);
-
-        thread.start();
-    }
-
-
-    public void shutdown()
-    {
-        thread.interrupt();
-    }
-
-    public List<Runnable> shutdownNow()
-    {
-        thread.interrupt();
-        return Collections.emptyList();
-    }
-
-    public boolean isShutdown()
-    {
-        return shutdown.getCount() == 0;
-    }
-
-    public boolean isTerminated()
-    {
-        return shutdown.getCount() == 0;
-    }
-
-    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
-    {
-        return shutdown.await(timeout, unit);
-    }
-
-    public void execute(Runnable command)
-    {
-        while(!queue.add(command));
+        super(size, size, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new DefaultThreadFactory(name, true));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java
----------------------------------------------------------------------
diff --git a/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java b/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java
new file mode 100644
index 0000000..491dc44
--- /dev/null
+++ b/test/microbench/org/apache/cassandra/test/microbench/FastThreadLocalBench.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.util.concurrent.TimeUnit;
+
+import io.netty.util.concurrent.FastThreadLocal;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@Warmup(iterations = 3, time = 1, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 5, time = 2, timeUnit = TimeUnit.SECONDS)
+@Fork(value = 1,jvmArgsAppend = {"-Xmx512M", "-Djmh.executor=CUSTOM", "-Djmh.executor.class=org.apache.cassandra.test.microbench.FastThreadExecutor"})
+@Threads(4) // make sure this matches the number of _physical_cores_
+@State(Scope.Benchmark)
+public class FastThreadLocalBench
+{
+    @Param({"2", "4", "8", "12"})
+    private int variables = 2;
+
+    static final int max = 20;
+    static final ThreadLocal[] threadLocals = new ThreadLocal[max];
+    static final FastThreadLocal[] fastThreadLocals = new FastThreadLocal[max];
+    static
+    {
+        for (int i = 0; i < max; i++)
+        {
+            threadLocals[i] = ThreadLocal.withInitial(Object::new);
+            fastThreadLocals[i] = new FastThreadLocal() {
+                protected Object initialValue() throws Exception
+                {
+                    return new Object();
+                }
+            };
+        }
+    }
+
+    @State(Scope.Thread)
+    public static class FastThreadLocalBenchState
+    {
+        public int index;
+    }
+
+    @Benchmark
+    public void baseline(FastThreadLocalBenchState state, Blackhole bh)
+    {
+        if (variables != 2)
+            throw new IllegalArgumentException("skipped");
+
+        bh.consume("foo");
+    }
+
+    @Benchmark
+    public void threadLocal(FastThreadLocalBenchState state, Blackhole bh)
+    {
+        bh.consume(threadLocals[state.index % max].get());
+    }
+
+    @Benchmark
+    public void fastThreadLocal(FastThreadLocalBenchState state, Blackhole bh)
+    {
+        bh.consume(fastThreadLocals[state.index % max].get());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index a4173d6..eca124f 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -32,6 +32,7 @@ import java.util.List;
 
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.Pair;
 
@@ -135,7 +136,7 @@ public class CacheProviderTest
         List<Thread> threads = new ArrayList<>(100);
         for (int i = 0; i < 100; i++)
         {
-            Thread thread = new Thread(runnable);
+            Thread thread = NamedThreadFactory.createThread(runnable);
             threads.add(thread);
             thread.start();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
index fdc6880..ac2a9c0 100644
--- a/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
+++ b/test/unit/org/apache/cassandra/concurrent/WaitQueueTest.java
@@ -44,7 +44,7 @@ public class WaitQueueTest
         final AtomicInteger ready = new AtomicInteger();
         Thread[] ts = new Thread[4];
         for (int i = 0 ; i < ts.length ; i++)
-            ts[i] = new Thread(new Runnable()
+            ts[i] = NamedThreadFactory.createThread(new Runnable()
         {
             @Override
             public void run()
@@ -84,7 +84,7 @@ public class WaitQueueTest
         final AtomicBoolean ready = new AtomicBoolean(false);
         final AtomicBoolean condition = new AtomicBoolean(false);
         final AtomicBoolean fail = new AtomicBoolean(false);
-        Thread t = new Thread(new Runnable()
+        Thread t = NamedThreadFactory.createThread(new Runnable()
         {
             @Override
             public void run()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
index cbc412d..f5bda4f 100644
--- a/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
+++ b/test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
@@ -34,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.ParameterizedClass;
@@ -156,20 +157,17 @@ public class RecoveryManagerTest
             Assert.assertTrue(Util.getAllUnfiltered(Util.cmd(keyspace2.getColumnFamilyStore(CF_STANDARD3), dk).build()).isEmpty());
 
             final AtomicReference<Throwable> err = new AtomicReference<Throwable>();
-            Thread t = new Thread() {
-                @Override
-                public void run()
+            Thread t = NamedThreadFactory.createThread(() ->
+            {
+                try
                 {
-                    try
-                    {
-                        CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
-                    }
-                    catch (Throwable t)
-                    {
-                        err.set(t);
-                    }
+                    CommitLog.instance.resetUnsafe(false); // disassociate segments from live CL
                 }
-            };
+                catch (Throwable x)
+                {
+                    err.set(x);
+                }
+            });
             t.start();
             Assert.assertTrue(mockInitiator.blocked.tryAcquire(1, 20, TimeUnit.SECONDS));
             Thread.sleep(100);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
index 78ea4f4..08f7ec0 100644
--- a/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
+++ b/test/unit/org/apache/cassandra/hints/HintsBufferTest.java
@@ -28,6 +28,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Mutation;
@@ -114,7 +115,7 @@ public class HintsBufferTest
         // create HINT_THREADS_COUNT, start them, and wait for them to finish
         List<Thread> threads = new ArrayList<>(HINT_THREADS_COUNT);
         for (int i = 0; i < HINT_THREADS_COUNT; i ++)
-            threads.add(new Thread(new Writer(buffer, load, hintSize, i, baseTimestamp)));
+            threads.add(NamedThreadFactory.createThread(new Writer(buffer, load, hintSize, i, baseTimestamp)));
         threads.forEach(java.lang.Thread::start);
         for (Thread thread : threads)
             thread.join();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
index 9737281..f287912 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.RowUpdateBuilder;
@@ -619,7 +620,7 @@ public class IndexSummaryManagerTest
         // barrier to control when redistribution runs
         final CountDownLatch barrier = new CountDownLatch(1);
 
-        Thread t = new Thread(new Runnable()
+        Thread t = NamedThreadFactory.createThread(new Runnable()
         {
             public void run()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 942ebe9..d1b4092 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -33,6 +33,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.Util;
 import org.apache.cassandra.UpdateBuilder;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -813,7 +814,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
                 }
             }
         };
-        Thread t = new Thread(r);
+        Thread t = NamedThreadFactory.createThread(r);
         try
         {
             t.start();
@@ -895,7 +896,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase
                 }
             }
         };
-        Thread t = new Thread(r);
+        Thread t = NamedThreadFactory.createThread(r);
         try
         {
             t.start();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/service/RemoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java
index 0ef9b9c..701ea0f 100644
--- a/test/unit/org/apache/cassandra/service/RemoveTest.java
+++ b/test/unit/org/apache/cassandra/service/RemoveTest.java
@@ -32,6 +32,7 @@ import org.junit.*;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
@@ -141,23 +142,20 @@ public class RemoveTest
     {
         // start removal in background and send replication confirmations
         final AtomicBoolean success = new AtomicBoolean(false);
-        Thread remover = new Thread()
+        Thread remover = NamedThreadFactory.createThread(() ->
         {
-            public void run()
+            try
             {
-                try
-                {
-                    ss.removeNode(removalId.toString());
-                }
-                catch (Exception e)
-                {
-                    System.err.println(e);
-                    e.printStackTrace();
-                    return;
-                }
-                success.set(true);
+                ss.removeNode(removalId.toString());
             }
-        };
+            catch (Exception e)
+            {
+                System.err.println(e);
+                e.printStackTrace();
+                return;
+            }
+            success.set(true);
+        });
         remover.start();
 
         Thread.sleep(1000); // make sure removal is waiting for confirmation

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
index bb6e3a8..42aef0c 100644
--- a/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
+++ b/test/unit/org/apache/cassandra/utils/TopKSamplerTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 import com.clearspring.analytics.hash.MurmurHash;
 import com.clearspring.analytics.stream.Counter;
 import junit.framework.Assert;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.utils.TopKSampler.SamplerResult;
 
 public class TopKSamplerTest
@@ -82,7 +83,7 @@ public class TopKSamplerTest
         final CountDownLatch latch = new CountDownLatch(1);
         final TopKSampler<String> sampler = new TopKSampler<String>();
 
-        new Thread(new Runnable()
+        NamedThreadFactory.createThread(new Runnable()
         {
             public void run()
             {
@@ -99,7 +100,7 @@ public class TopKSamplerTest
             }
 
         }
-        ,"inserter").start();
+        , "inserter").start();
         try
         {
             // start/stop in fast iterations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/tools/stress/src/org/apache/cassandra/stress/StressServer.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/StressServer.java b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
index 793f8f0..c00fb54 100644
--- a/tools/stress/src/org/apache/cassandra/stress/StressServer.java
+++ b/tools/stress/src/org/apache/cassandra/stress/StressServer.java
@@ -23,9 +23,11 @@ import java.io.PrintStream;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.cli.*;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.stress.settings.StressSettings;
 import org.apache.cassandra.stress.util.MultiResultLogger;
 import org.apache.cassandra.stress.util.ResultLogger;
@@ -39,6 +41,8 @@ public class StressServer
         availableOptions.addOption("h", "host", true, "Host to listen for connections.");
     }
 
+    private static final AtomicInteger threadCounter = new AtomicInteger(1);
+
     public static void main(String[] args) throws Exception
     {
         ServerSocket serverSocket = null;
@@ -93,7 +97,7 @@ public class StressServer
                 ResultLogger log = new MultiResultLogger(out);
 
                 StressAction action = new StressAction((StressSettings) in.readObject(), log);
-                Thread actionThread = new Thread(action);
+                Thread actionThread = NamedThreadFactory.createThread(action, "stress-" + threadCounter.incrementAndGet());
                 actionThread.start();
 
                 while (actionThread.isAlive())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cecbe17e/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
----------------------------------------------------------------------
diff --git a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
index f561f61..ff8b27f 100644
--- a/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
+++ b/tools/stress/src/org/apache/cassandra/stress/operations/userdefined/TokenRangeQuery.java
@@ -35,6 +35,7 @@ import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.TableMetadata;
 import com.datastax.driver.core.Token;
 import com.datastax.driver.core.TokenRange;
+import io.netty.util.concurrent.FastThreadLocal;
 import org.apache.cassandra.stress.Operation;
 import org.apache.cassandra.stress.StressYaml;
 import org.apache.cassandra.stress.WorkManager;
@@ -46,7 +47,7 @@ import org.apache.cassandra.stress.util.ThriftClient;
 
 public class TokenRangeQuery extends Operation
 {
-    private final ThreadLocal<State> currentState = new ThreadLocal<>();
+    private final FastThreadLocal<State> currentState = new FastThreadLocal<>();
 
     private final TableMetadata tableMetadata;
     private final TokenRangeIterator tokenRangeIterator;


Mime
View raw message