cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/2] cassandra git commit: Centralize shared executors
Date Wed, 19 Nov 2014 23:10:52 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 1bd5c64ba -> 8badc2856


Centralize shared executors

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-8055


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4397c344
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4397c344
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4397c344

Branch: refs/heads/trunk
Commit: 4397c34476070ea15ee0d2b9c625887a8b08b622
Parents: e3862bc
Author: Sam Tunnicliffe <sam@beobal.com>
Authored: Thu Nov 20 01:42:03 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Thu Nov 20 01:57:01 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/auth/Auth.java    | 17 ++++----
 .../cassandra/auth/PasswordAuthenticator.java   | 18 ++++----
 .../apache/cassandra/cache/AutoSavingCache.java | 10 ++---
 .../concurrent/ScheduledExecutors.java          | 43 ++++++++++++++++++++
 .../apache/cassandra/cql3/QueryProcessor.java   |  5 +--
 .../apache/cassandra/db/BatchlogManager.java    |  8 +++-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 42 +++++++++++--------
 .../cassandra/db/HintedHandOffManager.java      |  7 ++--
 .../db/commitlog/CommitLogArchiver.java         |  2 +-
 .../io/sstable/SSTableDeletingTask.java         |  6 +--
 .../cassandra/io/sstable/SSTableReader.java     |  3 +-
 .../org/apache/cassandra/io/util/FileUtils.java |  4 +-
 .../locator/DynamicEndpointSnitch.java          |  5 ++-
 .../apache/cassandra/net/MessagingService.java  |  4 +-
 .../cassandra/service/CassandraDaemon.java      |  3 +-
 .../cassandra/service/LoadBroadcaster.java      |  3 +-
 .../cassandra/service/MigrationManager.java     |  3 +-
 .../cassandra/service/StorageService.java       | 38 ++++-------------
 .../apache/cassandra/utils/ResourceWatcher.java |  4 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |  6 +--
 .../org/apache/cassandra/db/KeyCacheTest.java   |  5 +--
 22 files changed, 139 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c00e671..41a5aaf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Centralize shared executors (CASSANDRA-8055)
  * Fix filtering for CONTAINS (KEY) relations on frozen collection
    clustering columns when the query is restricted to a single
    partition (CASSANDRA-8203)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/auth/Auth.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/Auth.java b/src/java/org/apache/cassandra/auth/Auth.java
index 4f18111..ed7aa87 100644
--- a/src/java/org/apache/cassandra/auth/Auth.java
+++ b/src/java/org/apache/cassandra/auth/Auth.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
@@ -189,15 +190,13 @@ public class Auth implements AuthMBean
         // the delay is here to give the node some time to see its peers - to reduce
         // "Skipped default superuser setup: some nodes were not ready" log spam.
         // It's the only reason for the delay.
-        StorageService.tasks.schedule(new Runnable()
-                                      {
-                                          public void run()
-                                          {
-                                              setupDefaultSuperuser();
-                                          }
-                                      },
-                                      SUPERUSER_SETUP_DELAY,
-                                      TimeUnit.MILLISECONDS);
+        ScheduledExecutors.nonPeriodicTasks.schedule(new Runnable()
+        {
+            public void run()
+            {
+                setupDefaultSuperuser();
+            }
+        }, SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 1218ee2..9570770 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -37,7 +38,6 @@ import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.mindrot.jbcrypt.BCrypt;
@@ -169,15 +169,13 @@ public class PasswordAuthenticator implements ISaslAwareAuthenticator
         // the delay is here to give the node some time to see its peers - to reduce
         // "skipped default user setup: some nodes are were not ready" log spam.
         // It's the only reason for the delay.
-        StorageService.tasks.schedule(new Runnable()
-                                      {
-                                          public void run()
-                                          {
-                                              setupDefaultUser();
-                                          }
-                                      },
-                                      Auth.SUPERUSER_SETUP_DELAY,
-                                      TimeUnit.MILLISECONDS);
+        ScheduledExecutors.nonPeriodicTasks.schedule(new Runnable()
+        {
+            public void run()
+            {
+              setupDefaultUser();
+            }
+        }, Auth.SUPERUSER_SETUP_DELAY, TimeUnit.MILLISECONDS);
 
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index fca939a..2117eb8 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -27,6 +27,7 @@ import org.cliffc.high_scale_lib.NonBlockingHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
@@ -39,7 +40,6 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
 
@@ -121,10 +121,10 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                     submitWrite(keysToSave);
                 }
             };
-            saveTask = StorageService.optionalTasks.scheduleWithFixedDelay(runnable,
-                                                                           savePeriodInSeconds,
-                                                                           savePeriodInSeconds,
-                                                                           TimeUnit.SECONDS);
+            saveTask = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(runnable,
+                                                                               savePeriodInSeconds,
+                                                                               savePeriodInSeconds,
+                                                                               TimeUnit.SECONDS);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
new file mode 100644
index 0000000..5935669
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/ScheduledExecutors.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+/**
+ * Centralized location for shared executors
+ */
+public class ScheduledExecutors
+{
+    /**
+     * This pool is used for periodic short (sub-second) tasks.
+     */
+     public static final DebuggableScheduledThreadPoolExecutor scheduledTasks = new DebuggableScheduledThreadPoolExecutor("ScheduledTasks");
+
+    /**
+     * This executor is used for tasks that can have longer execution times, and usually
are non periodic.
+     */
+    public static final DebuggableScheduledThreadPoolExecutor nonPeriodicTasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks");
+    static
+    {
+        nonPeriodicTasks.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+    }
+
+    /**
+     * This executor is used for tasks that do not need to be waited for on shutdown/drain.
+     */
+    public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 9f71d71..45ef39c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -22,7 +22,6 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
@@ -33,6 +32,7 @@ import org.github.jamm.MemoryMeter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.cql3.statements.*;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.composites.*;
@@ -89,7 +89,6 @@ public class QueryProcessor implements QueryHandler
     public static final CQLMetrics metrics = new CQLMetrics();
 
     private static final AtomicInteger lastMinuteEvictionsCount = new AtomicInteger(0);
-    private static final ScheduledExecutorService evictionCheckTimer = Executors.newScheduledThreadPool(1);
 
     static
     {
@@ -118,7 +117,7 @@ public class QueryProcessor implements QueryHandler
                                    })
                                    .build();
 
-        evictionCheckTimer.scheduleAtFixedRate(new Runnable()
+        ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(new Runnable()
         {
             public void run()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 279f876..20f134d 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -70,7 +70,7 @@ public class BatchlogManager implements BatchlogManagerMBean
     private final AtomicLong totalBatchesReplayed = new AtomicLong();
 
     // Single-thread executor service for scheduling and serializing log replay.
-    public static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
+    private static final ScheduledExecutorService batchlogTasks = new DebuggableScheduledThreadPoolExecutor("BatchlogTasks");
 
     public void start()
     {
@@ -95,6 +95,12 @@ public class BatchlogManager implements BatchlogManagerMBean
         batchlogTasks.scheduleWithFixedDelay(runnable, StorageService.RING_DELAY, REPLAY_INTERVAL,
TimeUnit.MILLISECONDS);
     }
 
+    public static void shutdown() throws InterruptedException
+    {
+        batchlogTasks.shutdown();
+        batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
     public int countAllBatches()
     {
         String query = String.format("SELECT count(*) FROM %s.%s", Keyspace.SYSTEM_KS, SystemKeyspace.BATCHLOG_CF);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0fa50bb..7e1dd18 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -33,15 +33,13 @@ import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.io.FSWriteError;
+
 import org.json.simple.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.*;
-import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
-import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.*;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.config.CFMetaData.SpeculativeRetry;
 import org.apache.cassandra.db.commitlog.CommitLog;
@@ -61,6 +59,7 @@ import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
+import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -86,18 +85,21 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                                                         
 new LinkedBlockingQueue<Runnable>(),
                                                                                         
 new NamedThreadFactory("MemtableFlushWriter"),
                                                                                         
 "internal");
+
     // post-flush executor is single threaded to provide guarantee that any flush Future
on a CF will never return until prior flushes have completed
-    public static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
-                                                                                        
    StageManager.KEEPALIVE,
-                                                                                        
    TimeUnit.SECONDS,
-                                                                                        
    new LinkedBlockingQueue<Runnable>(),
-                                                                                        
    new NamedThreadFactory("MemtablePostFlush"),
-                                                                                        
    "internal");
-    public static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1,
StageManager.KEEPALIVE,
-                                                                                        
  TimeUnit.SECONDS,
-                                                                                        
  new LinkedBlockingQueue<Runnable>(),
-                                                                                        
  new NamedThreadFactory("MemtableReclaimMemory"),
-                                                                                        
  "internal");
+    private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1,
+                                                                                        
     StageManager.KEEPALIVE,
+                                                                                        
     TimeUnit.SECONDS,
+                                                                                        
     new LinkedBlockingQueue<Runnable>(),
+                                                                                        
     new NamedThreadFactory("MemtablePostFlush"),
+                                                                                        
     "internal");
+
+    private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1,
+                                                                                        
   StageManager.KEEPALIVE,
+                                                                                        
   TimeUnit.SECONDS,
+                                                                                        
   new LinkedBlockingQueue<Runnable>(),
+                                                                                        
   new NamedThreadFactory("MemtableReclaimMemory"),
+                                                                                        
   "internal");
 
     public final Keyspace keyspace;
     public final String name;
@@ -134,6 +136,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     public final ColumnFamilyMetrics metric;
     public volatile long sampleLatencyNanos;
 
+    public static void shutdownPostFlushExecutor() throws InterruptedException
+    {
+        postFlushExecutor.shutdown();
+        postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
+    }
+
     public void reload()
     {
         // metadata object has been mutated directly. make all the members jibe with new
settings.
@@ -188,7 +196,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                     }
                 }
             };
-            StorageService.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS);
+            ScheduledExecutors.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -310,7 +318,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             throw new RuntimeException(e);
         }
         logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry());
-        StorageService.optionalTasks.scheduleWithFixedDelay(new Runnable()
+        ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable()
         {
             public void run()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index 0e68a71..ad8546e 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
@@ -176,7 +177,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 metrics.log();
             }
         };
-        StorageService.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
+        ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(runnable, 10, 10, TimeUnit.MINUTES);
     }
 
     private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
@@ -228,7 +229,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 }
             }
         };
-        StorageService.optionalTasks.submit(runnable);
+        ScheduledExecutors.optionalTasks.submit(runnable);
     }
 
     //foobar
@@ -249,7 +250,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 }
             }
         };
-        StorageService.optionalTasks.submit(runnable).get();
+        ScheduledExecutors.optionalTasks.submit(runnable).get();
 
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
index 1b1a1e0..6cba603 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java
@@ -52,7 +52,7 @@ public class CommitLogArchiver
     }
 
     public final Map<String, Future<?>> archivePending = new ConcurrentHashMap<String,
Future<?>>();
-    public final ExecutorService executor = new JMXEnabledThreadPoolExecutor("CommitLogArchiver");
+    private final ExecutorService executor = new JMXEnabledThreadPoolExecutor("CommitLogArchiver");
     private final String archiveCommand;
     private final String restoreCommand;
     private final String restoreDirectories;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
index d95dff7..fb1cbb3 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableDeletingTask.java
@@ -27,9 +27,9 @@ import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class SSTableDeletingTask implements Runnable
@@ -69,7 +69,7 @@ public class SSTableDeletingTask implements Runnable
 
     public void schedule()
     {
-        StorageService.tasks.submit(this);
+        ScheduledExecutors.nonPeriodicTasks.submit(this);
     }
 
     public void run()
@@ -119,7 +119,7 @@ public class SSTableDeletingTask implements Runnable
             }
         };
 
-        FBUtilities.waitOnFuture(StorageService.tasks.schedule(runnable, 0, TimeUnit.MILLISECONDS));
+        FBUtilities.waitOnFuture(ScheduledExecutors.nonPeriodicTasks.schedule(runnable, 0,
TimeUnit.MILLISECONDS));
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 8f302f3..a3e3cf5 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.cache.CachingOptions;
 import org.apache.cassandra.cache.InstrumentingCache;
 import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.Config;
@@ -635,7 +636,7 @@ public class SSTableReader extends SSTable
         else
             barrier = null;
 
-        StorageService.tasks.execute(new Runnable()
+        ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
         {
             public void run()
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 295679e..7d187ac 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -38,7 +38,7 @@ import sun.nio.ch.DirectBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.Config;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.BlacklistedDirectories;
 import org.apache.cassandra.db.Keyspace;
@@ -326,7 +326,7 @@ public class FileUtils
                 deleteWithConfirm(new File(file));
             }
         };
-        StorageService.tasks.execute(runnable);
+        ScheduledExecutors.nonPeriodicTasks.execute(runnable);
     }
 
     public static String stringifyFileSize(double value)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
index 49442c8..e4b714c 100644
--- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
+++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
@@ -84,8 +85,8 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements
ILa
                 reset();
             }
         };
-        StorageService.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS,
UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
-        StorageService.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS,
RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(update, UPDATE_INTERVAL_IN_MS,
UPDATE_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(reset, RESET_INTERVAL_IN_MS,
RESET_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
         registerMBean();
    }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 05b449c..73bc9ff 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -37,6 +37,8 @@ import com.google.common.collect.Lists;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.concurrent.TracingAwareExecutorService;
@@ -329,7 +331,7 @@ public final class MessagingService implements MessagingServiceMBean
                 logDroppedMessages();
             }
         };
-        StorageService.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS,
LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
+        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(logDropped, LOG_DROPPED_INTERVAL_IN_MS,
LOG_DROPPED_INTERVAL_IN_MS, TimeUnit.MILLISECONDS);
 
         Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>,
?> timeoutReporter = new Function<Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>>,
Object>()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 5897a22..1c99348 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import com.addthis.metrics.reporter.config.ReporterConfig;
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -329,7 +330,7 @@ public class CassandraDaemon
                 }
             }
         };
-        StorageService.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
+        ScheduledExecutors.optionalTasks.schedule(runnable, 5 * 60, TimeUnit.SECONDS);
 
         SystemKeyspace.finishStartup();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/LoadBroadcaster.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/LoadBroadcaster.java b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
index 4996e52..d12ffba 100644
--- a/src/java/org/apache/cassandra/service/LoadBroadcaster.java
+++ b/src/java/org/apache/cassandra/service/LoadBroadcaster.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.gms.*;
 
 public class LoadBroadcaster implements IEndpointStateChangeSubscriber
@@ -91,7 +92,7 @@ public class LoadBroadcaster implements IEndpointStateChangeSubscriber
                                                            StorageService.instance.valueFactory.load(StorageService.instance.getLoad()));
             }
         };
-        StorageService.scheduledTasks.scheduleWithFixedDelay(runnable, 2 * Gossiper.intervalInMillis,
BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
+        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(runnable, 2 * Gossiper.intervalInMillis,
BROADCAST_INTERVAL, TimeUnit.MILLISECONDS);
     }
 }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index bdae208..ce4dca4 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -32,6 +32,7 @@ import java.lang.management.RuntimeMXBean;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -126,7 +127,7 @@ public class MigrationManager
                     submitMigrationTask(endpoint);
                 }
             };
-            StorageService.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
+            ScheduledExecutors.optionalTasks.schedule(runnable, MIGRATION_DELAY_IN_MS, TimeUnit.MILLISECONDS);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 29054f4..ae8c798 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -54,7 +54,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.Auth;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
@@ -121,24 +121,6 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
             return 30 * 1000;
     }
 
-    /**
-     * This pool is used for periodic short (sub-second) tasks.
-     */
-     public static final DebuggableScheduledThreadPoolExecutor scheduledTasks = new DebuggableScheduledThreadPoolExecutor("ScheduledTasks");
-
-    /**
-     * This pool is used by tasks that can have longer execution times, and usually are non
periodic.
-     */
-    public static final DebuggableScheduledThreadPoolExecutor tasks = new DebuggableScheduledThreadPoolExecutor("NonPeriodicTasks");
-    /**
-     * tasks that do not need to be waited for on shutdown/drain
-     */
-    public static final DebuggableScheduledThreadPoolExecutor optionalTasks = new DebuggableScheduledThreadPoolExecutor("OptionalTasks");
-    static
-    {
-        tasks.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
-    }
-
     /* This abstraction maintains the token/endpoint metadata information */
     private TokenMetadata tokenMetadata = new TokenMetadata();
 
@@ -597,7 +579,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
 
                 if (daemon != null)
                 	shutdownClientServers();
-                optionalTasks.shutdown();
+                ScheduledExecutors.optionalTasks.shutdown();
                 Gossiper.instance.stop();
 
                 // In-progress writes originating here could generate hints to be written,
so shut down MessagingService
@@ -633,8 +615,8 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
                 CommitLog.instance.shutdownBlocking();
 
                 // wait for miscellaneous tasks like sstable and commitlog segment deletion
-                tasks.shutdown();
-                if (!tasks.awaitTermination(1, TimeUnit.MINUTES))
+                ScheduledExecutors.nonPeriodicTasks.shutdown();
+                if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
                     logger.warn("Miscellaneous task executor still busy after one minute;
proceeding with shutdown");
             }
         }, "StorageServiceShutdownHook");
@@ -3602,7 +3584,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
         }
         setMode(Mode.DRAINING, "starting drain process", true);
         shutdownClientServers();
-        optionalTasks.shutdown();
+        ScheduledExecutors.optionalTasks.shutdown();
         Gossiper.instance.stop();
 
         setMode(Mode.DRAINING, "shutting down MessageService", false);
@@ -3647,21 +3629,19 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
         }
         FBUtilities.waitOnFutures(flushes);
 
-        BatchlogManager.batchlogTasks.shutdown();
-        BatchlogManager.batchlogTasks.awaitTermination(60, TimeUnit.SECONDS);
+        BatchlogManager.shutdown();
 
         // whilst we've flushed all the CFs, which will have recycled all completed segments,
we want to ensure
         // there are no segments to replay, so we force the recycling of any remaining (should
be at most one)
         CommitLog.instance.forceRecycleAllSegments();
 
-        ColumnFamilyStore.postFlushExecutor.shutdown();
-        ColumnFamilyStore.postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
+        ColumnFamilyStore.shutdownPostFlushExecutor();
 
         CommitLog.instance.shutdownBlocking();
 
         // wait for miscellaneous tasks like sstable and commitlog segment deletion
-        tasks.shutdown();
-        if (!tasks.awaitTermination(1, TimeUnit.MINUTES))
+        ScheduledExecutors.nonPeriodicTasks.shutdown();
+        if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, TimeUnit.MINUTES))
             logger.warn("Miscellaneous task executor still busy after one minute; proceeding
with shutdown");
 
         setMode(Mode.DRAINED, true);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/src/java/org/apache/cassandra/utils/ResourceWatcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ResourceWatcher.java b/src/java/org/apache/cassandra/utils/ResourceWatcher.java
index 2dfab95..5e7cbdd 100644
--- a/src/java/org/apache/cassandra/utils/ResourceWatcher.java
+++ b/src/java/org/apache/cassandra/utils/ResourceWatcher.java
@@ -23,13 +23,13 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 
 public class ResourceWatcher
 {
     public static void watch(String resource, Runnable callback, int period)
     {
-        StorageService.scheduledTasks.scheduleWithFixedDelay(new WatchedResource(resource,
callback), period, period, TimeUnit.MILLISECONDS);
+        ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay(new WatchedResource(resource,
callback), period, period, TimeUnit.MILLISECONDS);
     }
 
     public static class WatchedResource implements Runnable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 470b701..dd22896 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.Directories;
@@ -43,7 +44,6 @@ import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.serializers.TypeSerializer;
-import org.apache.cassandra.service.StorageService;
 
 /**
  * Base class for CQL tests.
@@ -88,7 +88,7 @@ public abstract class CQLTester
         currentTypes.clear();
 
         // We want to clean up after the test, but dropping a table is rather long so just
do that asynchronously
-        StorageService.optionalTasks.execute(new Runnable()
+        ScheduledExecutors.optionalTasks.execute(new Runnable()
         {
             public void run()
             {
@@ -105,7 +105,7 @@ public abstract class CQLTester
                     // mono-threaded, just push a task on the queue to find when it's empty.
No perfect but good enough.
 
                     final CountDownLatch latch = new CountDownLatch(1);
-                    StorageService.tasks.execute(new Runnable()
+                    ScheduledExecutors.nonPeriodicTasks.execute(new Runnable()
                     {
                             public void run()
                             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4397c344/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index c0560ab..1bc7caf 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -31,12 +30,12 @@ import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cache.KeyCacheKey;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.CacheService;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.assertEquals;
@@ -165,7 +164,7 @@ public class KeyCacheTest extends SchemaLoader
             reader.releaseReference();
 
         Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS);;
-        while (StorageService.tasks.getActiveCount() + StorageService.tasks.getQueue().size()
> 0);
+        while (ScheduledExecutors.nonPeriodicTasks.getActiveCount() + ScheduledExecutors.nonPeriodicTasks.getQueue().size()
> 0);
 
         // after releasing the reference this should drop to 2
         assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);


Mime
View raw message