cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject cassandra git commit: Use an ExecutorService for repair commands instead of new Thread(..).start()
Date Mon, 14 Aug 2017 12:15:15 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk e9cc805db -> 62d39f654


Use an ExecutorService for repair commands instead of new Thread(..).start()

Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-13594


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/62d39f65
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/62d39f65
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/62d39f65

Branch: refs/heads/trunk
Commit: 62d39f6544e3fbcbc268aecbb3a46950dcba2bf0
Parents: e9cc805
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Thu Jun 8 13:34:18 2017 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Mon Aug 14 14:12:34 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../JMXEnabledThreadPoolExecutor.java           | 14 ++++++++++++
 .../org/apache/cassandra/config/Config.java     |  9 ++++++++
 .../cassandra/config/DatabaseDescriptor.java    | 10 ++++++++
 .../cassandra/service/ActiveRepairService.java  | 24 ++++++++++++++++++++
 .../cassandra/service/StorageService.java       | 21 ++++++++++++++---
 6 files changed, 76 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/62d39f65/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7c9d79a..a6428d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
  * Fix race / ref leak in anticompaction (CASSANDRA-13688)
  * Expose tasks queue length via JMX (CASSANDRA-12758)
  * Fix race / ref leak in PendingRepairManager (CASSANDRA-13751)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62d39f65/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
index a7a54f2..2dafb4f 100644
--- a/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/JMXEnabledThreadPoolExecutor.java
@@ -21,6 +21,7 @@ import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -93,6 +94,19 @@ public class JMXEnabledThreadPoolExecutor extends DebuggableThreadPoolExecutor
i
         }
     }
 
+    public JMXEnabledThreadPoolExecutor(int corePoolSize,
+                                        int maxPoolSize,
+                                        long keepAliveTime,
+                                        TimeUnit unit,
+                                        BlockingQueue<Runnable> workQueue,
+                                        NamedThreadFactory threadFactory,
+                                        String jmxPath,
+                                        RejectedExecutionHandler rejectedExecutionHandler)
+    {
+        this(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue, threadFactory, jmxPath);
+        setRejectedExecutionHandler(rejectedExecutionHandler);
+    }
+
     public JMXEnabledThreadPoolExecutor(Stage stage)
     {
         this(stage.getJmxName(), stage.getJmxType());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62d39f65/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 22f3551..5a45282 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -348,6 +348,9 @@ public class Config
     public volatile boolean back_pressure_enabled = false;
     public volatile ParameterizedClass back_pressure_strategy;
 
+    public RepairCommandPoolFullStrategy repair_command_pool_full_strategy = RepairCommandPoolFullStrategy.queue;
+    public int repair_command_pool_size = concurrent_validations;
+
     /**
      * @deprecated migrate to {@link DatabaseDescriptor#isClientInitialized()}
      */
@@ -425,6 +428,12 @@ public class Config
         spinning
     }
 
+    public enum RepairCommandPoolFullStrategy
+    {
+        queue,
+        reject
+    }
+
     private static final List<String> SENSITIVE_KEYS = new ArrayList<String>()
{{
         add("client_encryption_options");
         add("server_encryption_options");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62d39f65/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 89c9432..fb50826 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -2328,4 +2328,14 @@ public class DatabaseDescriptor
     {
         conf.ideal_consistency_level = cl;
     }
+
+    public static int getRepairCommandPoolSize()
+    {
+        return conf.repair_command_pool_size;
+    }
+
+    public static Config.RepairCommandPoolFullStrategy getRepairCommandPoolFullStrategy()
+    {
+        return conf.repair_command_pool_full_strategy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62d39f65/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index d50dc3f..2e02f0c 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -42,7 +42,10 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.dht.Bounds;
@@ -77,6 +80,8 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDGen;
 
+import static org.apache.cassandra.config.Config.RepairCommandPoolFullStrategy.queue;
+
 /**
  * ActiveRepairService is the starting point for manual "active" repairs.
  *
@@ -125,6 +130,25 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber,
IFai
 
     private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions = new
ConcurrentHashMap<>();
 
+    public final static ExecutorService repairCommandExecutor;
+    static
+    {
+        Config.RepairCommandPoolFullStrategy strategy = DatabaseDescriptor.getRepairCommandPoolFullStrategy();
+        BlockingQueue<Runnable> queue;
+        if (strategy == Config.RepairCommandPoolFullStrategy.reject)
+            queue = new SynchronousQueue<>();
+        else
+            queue = new LinkedBlockingQueue<>();
+
+        repairCommandExecutor = new JMXEnabledThreadPoolExecutor(1,
+                                                                 DatabaseDescriptor.getRepairCommandPoolSize(),
+                                                                 1, TimeUnit.HOURS,
+                                                                 queue,
+                                                                 new NamedThreadFactory("Repair-Task"),
+                                                                 "internal",
+                                                                 new ThreadPoolExecutor.AbortPolicy());
+    }
+
     private final IFailureDetector failureDetector;
     private final Gossiper gossiper;
     private final Cache<Integer, Pair<ParentRepairStatus, List<String>>>
repairStatusByCmd;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/62d39f65/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 9070e89..5f691b4 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.auth.AuthSchemaChangeListener;
 import org.apache.cassandra.batchlog.BatchRemoveVerbHandler;
 import org.apache.cassandra.batchlog.BatchStoreVerbHandler;
 import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.ExecutorLocals;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Stage;
@@ -95,6 +96,7 @@ import org.apache.cassandra.service.paxos.PrepareVerbHandler;
 import org.apache.cassandra.service.paxos.ProposeVerbHandler;
 import org.apache.cassandra.streaming.*;
 import org.apache.cassandra.tracing.TraceKeyspace;
+import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.progress.ProgressEvent;
@@ -120,8 +122,6 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
 
     private final JMXProgressSupport progressSupport = new JMXProgressSupport(this);
 
-    private static final AtomicInteger threadCounter = new AtomicInteger(1);
-
     private static int getRingDelay()
     {
         String newdelay = System.getProperty("cassandra.ring_delay_ms");
@@ -3304,7 +3304,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
             return 0;
 
         int cmd = nextRepairCommand.incrementAndGet();
-        NamedThreadFactory.createThread(createRepairTask(cmd, keyspace, option), "Repair-Task-"
+ threadCounter.incrementAndGet()).start();
+        ActiveRepairService.repairCommandExecutor.execute(createRepairTask(cmd, keyspace,
option));
         return cmd;
     }
 
@@ -3360,6 +3360,21 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
 
         RepairRunnable task = new RepairRunnable(this, cmd, options, keyspace);
         task.addProgressListener(progressSupport);
+        if (options.isTraced())
+        {
+            Runnable r = () ->
+            {
+                try
+                {
+                    task.run();
+                }
+                finally
+                {
+                    ExecutorLocals.set(null);
+                }
+            };
+            return new FutureTask<>(r, null);
+        }
         return new FutureTask<>(task, null);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message