cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [03/15] cassandra git commit: Add a -j parameter to scrub/cleanup/upgradesstables to state how many threads to use
Date Tue, 29 Mar 2016 09:12:49 GMT
Add a -j parameter to scrub/cleanup/upgradesstables to state how many threads to use

Patch by marcuse; reviewed by Carl Yeksigian for CASSANDRA-11179


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b8a3f5b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b8a3f5b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b8a3f5b

Branch: refs/heads/trunk
Commit: 8b8a3f5b99fa7a8eefed14cd7e41b81773617046
Parents: a9b5422
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Mon Mar 14 11:01:11 2016 +0100
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Tue Mar 29 10:50:38 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 ++
 .../apache/cassandra/db/ColumnFamilyStore.java  | 12 +++----
 .../db/compaction/CompactionManager.java        | 35 +++++++++++++++-----
 .../cassandra/service/StorageService.java       | 24 +++++++++++---
 .../cassandra/service/StorageServiceMBean.java  |  6 ++++
 .../org/apache/cassandra/tools/NodeProbe.java   | 33 +++++++++++-------
 .../org/apache/cassandra/tools/NodeTool.java    | 21 ++++++++++--
 .../org/apache/cassandra/db/CleanupTest.java    |  6 ++--
 .../unit/org/apache/cassandra/db/ScrubTest.java | 18 +++++-----
 .../LeveledCompactionStrategyTest.java          |  2 +-
 10 files changed, 111 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f01114..7794d4f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,8 @@
  * Gossiper#isEnabled is not thread safe (CASSANDRA-11116)
  * Avoid major compaction mixing repaired and unrepaired sstables in DTCS (CASSANDRA-11113)
  * test_bulk_round_trip_blogposts is failing occasionally (CASSANDRA-10938)
+ * Add a -j parameter to scrub/cleanup/upgradesstables to state how
+   many threads to use (CASSANDRA-11179)
 
 
 2.1.13

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index a78f33f..3d66d3a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1478,22 +1478,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return maxFile;
     }
 
-    public CompactionManager.AllSSTableOpStatus forceCleanup() throws ExecutionException,
InterruptedException
+    public CompactionManager.AllSSTableOpStatus forceCleanup(int jobs) throws ExecutionException,
InterruptedException
     {
-        return CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
+        return CompactionManager.instance.performCleanup(ColumnFamilyStore.this, jobs);
     }
 
-    public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted,
boolean checkData) throws ExecutionException, InterruptedException
+    public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted,
boolean checkData, int jobs) throws ExecutionException, InterruptedException
     {
         // skip snapshot creation during scrub, SEE JIRA 5891
         if(!disableSnapshot)
             snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
-        return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted,
checkData);
+        return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted,
checkData, jobs);
     }
 
-    public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion)
throws ExecutionException, InterruptedException
+    public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion,
int jobs) throws ExecutionException, InterruptedException
     {
-        return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion);
+        return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion,
jobs);
     }
 
     public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index ec7cb45..e382cab 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -271,7 +271,17 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final
OneSSTableOperation operation) throws ExecutionException, InterruptedException
+    /**
+     * Run an operation over all sstables using jobs threads
+     *
+     * @param cfs the column family store to run the operation on
+     * @param operation the operation to run
+     * @param jobs the number of threads to use - 0 means use all available. It never uses
more than concurrent_compactors threads
+     * @return status of the operation
+     * @throws ExecutionException
+     * @throws InterruptedException
+     */
+    private AllSSTableOpStatus parallelAllSSTableOperation(final ColumnFamilyStore cfs, final
OneSSTableOperation operation, int jobs) throws ExecutionException, InterruptedException
     {
         Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting();
         if (compactingSSTables == null)
@@ -299,7 +309,8 @@ public class CompactionManager implements CompactionManagerMBean
                     logger.info("Executor has shut down, not submitting task");
                     return AllSSTableOpStatus.ABORTED;
                 }
-                futures.add(executor.submit(new Callable<Object>()
+
+                Callable<Object> callable = new Callable<Object>()
                 {
                     @Override
                     public Object call() throws Exception
@@ -315,7 +326,13 @@ public class CompactionManager implements CompactionManagerMBean
                         }
                         return this;
                     }
-                }));
+                };
+                futures.add(executor.submit(callable));
+                if (jobs > 0 && futures.size() == jobs)
+                {
+                    FBUtilities.waitOnFutures(futures);
+                    futures.clear();
+                }
             }
             FBUtilities.waitOnFutures(futures);
         }
@@ -341,7 +358,7 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
-    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted,
final boolean checkData) throws InterruptedException, ExecutionException
+    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted,
final boolean checkData, int jobs) throws InterruptedException, ExecutionException
     {
         assert !cfs.isIndex();
         return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
@@ -357,10 +374,10 @@ public class CompactionManager implements CompactionManagerMBean
             {
                 scrubOne(cfs, input, skipCorrupted, checkData);
             }
-        });
+        }, jobs);
     }
 
-    public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean
excludeCurrentVersion) throws InterruptedException, ExecutionException
+    public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean
excludeCurrentVersion, int jobs) throws InterruptedException, ExecutionException
     {
         return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
         {
@@ -385,10 +402,10 @@ public class CompactionManager implements CompactionManagerMBean
                 task.setCompactionType(OperationType.UPGRADE_SSTABLES);
                 task.execute(metrics);
             }
-        });
+        }, jobs);
     }
 
-    public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException,
ExecutionException
+    public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jobs) throws
InterruptedException, ExecutionException
     {
         assert !cfStore.isIndex();
         Keyspace keyspace = cfStore.keyspace;
@@ -416,7 +433,7 @@ public class CompactionManager implements CompactionManagerMBean
                 CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, ranges);
                 doCleanupOne(cfStore, input, cleanupStrategy, ranges, hasIndexes);
             }
-        });
+        }, jobs);
     }
 
     public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 98e2251..507aedb 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2385,13 +2385,18 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
 
     public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws
IOException, ExecutionException, InterruptedException
     {
+        return forceKeyspaceCleanup(0, keyspaceName, columnFamilies);
+    }
+
+    public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException
+    {
         if (keyspaceName.equals(Keyspace.SYSTEM_KS))
             throw new RuntimeException("Cleanup of the system keyspace is neither necessary
nor wise");
 
         CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName,
columnFamilies))
         {
-            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.forceCleanup();
+            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.forceCleanup(jobs);
             if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
                 status = oneStatus;
         }
@@ -2400,27 +2405,36 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
 
     public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
-        return scrub(disableSnapshot, skipCorrupted, true, keyspaceName, columnFamilies);
+        return scrub(disableSnapshot, skipCorrupted, true, 0, keyspaceName, columnFamilies);
     }
 
     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String
keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
+        return scrub(disableSnapshot, skipCorrupted, checkData, 0, keyspaceName, columnFamilies);
+    }
+
+    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int
jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException,
InterruptedException
+    {
         CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName,
columnFamilies))
         {
-            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot,
skipCorrupted, checkData);
+            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot,
skipCorrupted, checkData, jobs);
             if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
                 status = oneStatus;
         }
         return status.statusCode;
     }
-
     public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String...
columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
+        return upgradeSSTables(keyspaceName, excludeCurrentVersion, 2, columnFamilies);
+    }
+
+    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+    {
         CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName,
columnFamilies))
         {
-            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion);
+            CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion,
jobs);
             if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
                 status = oneStatus;
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8fa2433..d3a1725 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -260,7 +260,9 @@ public interface StorageServiceMBean extends NotificationEmitter
     /**
      * Trigger a cleanup of keys on a single keyspace
      */
+    @Deprecated
     public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws
IOException, ExecutionException, InterruptedException;
+    public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException;
 
     /**
      * Scrub (deserialize + reserialize at the latest version, skipping bad rows if any)
the given keyspace.
@@ -270,13 +272,17 @@ public interface StorageServiceMBean extends NotificationEmitter
      */
     @Deprecated
     public int scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+    @Deprecated
     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String
keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
+    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int
jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException,
InterruptedException;
 
     /**
      * Rewrite all sstables to the latest version.
      * Unlike scrub, it doesn't skip bad rows and do not snapshot sstables first.
      */
+    @Deprecated
     public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String...
columnFamilies) throws IOException, ExecutionException, InterruptedException;
+    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException;
 
     /**
      * Flush all memtables for the given column families, or all columnfamilies for the given
keyspace

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1ad1147..ab08e9f 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -63,6 +63,7 @@ import javax.management.remote.JMXServiceURL;
 import javax.rmi.ssl.SslRMIClientSocketFactory;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
 import org.apache.cassandra.db.HintedHandOffManager;
 import org.apache.cassandra.db.HintedHandOffManagerMBean;
@@ -237,42 +238,50 @@ public class NodeProbe implements AutoCloseable
         jmxc.close();
     }
 
-    public int forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws
IOException, ExecutionException, InterruptedException
+    public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException
     {
-        return ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies);
+        return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies);
     }
 
-    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, String
keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+    public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, int
jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException,
InterruptedException
     {
-        return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies);
+        return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName,
columnFamilies);
     }
 
-    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String...
columnFamilies) throws IOException, ExecutionException, InterruptedException
+    public int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, int jobs,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
-        return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies);
+        return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies);
     }
 
-    public void forceKeyspaceCleanup(PrintStream out, String keyspaceName, String... columnFamilies)
throws IOException, ExecutionException, InterruptedException
+    private void checkJobs(PrintStream out, int jobs)
     {
-        if (forceKeyspaceCleanup(keyspaceName, columnFamilies) != 0)
+        if (jobs > DatabaseDescriptor.getConcurrentCompactors())
+            out.println(String.format("jobs (%d) is bigger than configured concurrent_compactors
(%d), using at most %d threads", jobs, DatabaseDescriptor.getConcurrentCompactors(), DatabaseDescriptor.getConcurrentCompactors()));
+    }
+
+    public void forceKeyspaceCleanup(PrintStream out, int jobs, String keyspaceName, String...
columnFamilies) throws IOException, ExecutionException, InterruptedException
+    {
+        if (forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies) != 0)
         {
             failed = true;
             out.println("Aborted cleaning up atleast one column family in keyspace "+keyspaceName+",
check server logs for more information.");
         }
     }
 
-    public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean
checkData, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException,
InterruptedException
+    public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, boolean
checkData, int jobs, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException,
InterruptedException
     {
-        if (scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, columnFamilies)
!= 0)
+        checkJobs(out, jobs);
+        if (scrub(disableSnapshot, skipCorrupted, checkData, jobs, keyspaceName, columnFamilies)
!= 0)
         {
             failed = true;
             out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+",
check server logs for more information.");
         }
     }
 
-    public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion,
String... columnFamilies) throws IOException, ExecutionException, InterruptedException
+    public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion,
int jobs, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
     {
-        if (upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies) != 0)
+        checkJobs(out, jobs);
+        if (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, columnFamilies) !=
0)
         {
             failed = true;
             out.println("Aborted upgrading sstables for atleast one column family in keyspace
"+keyspaceName+", check server logs for more information.");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 819049e..1de2e20 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -1138,6 +1138,11 @@ public class NodeTool
         @Arguments(usage = "[<keyspace> <cfnames>...]", description = "The keyspace
followed by one or many column families")
         private List<String> args = new ArrayList<>();
 
+        @Option(title = "jobs",
+                name = {"-j", "--jobs"},
+                description = "Number of sstables to cleanup simultaneusly, set to 0 to use
all available compaction threads")
+        private int jobs = 2;
+
         @Override
         public void execute(NodeProbe probe)
         {
@@ -1151,7 +1156,7 @@ public class NodeTool
 
                 try
                 {
-                    probe.forceKeyspaceCleanup(System.out, keyspace, cfnames);
+                    probe.forceKeyspaceCleanup(System.out, jobs, keyspace, cfnames);
                 } catch (Exception e)
                 {
                     throw new RuntimeException("Error occurred during cleanup", e);
@@ -1267,6 +1272,11 @@ public class NodeTool
                 description = "Do not validate columns using column validator")
         private boolean noValidation = false;
 
+        @Option(title = "jobs",
+                name = {"-j", "--jobs"},
+                description = "Number of sstables to scrub simultanously, set to 0 to use
all available compaction threads")
+        private int jobs = 2;
+
         @Override
         public void execute(NodeProbe probe)
         {
@@ -1277,7 +1287,7 @@ public class NodeTool
             {
                 try
                 {
-                    probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation,
keyspace, cfnames);
+                    probe.scrub(System.out, disableSnapshot, skipCorrupted, !noValidation,
jobs, keyspace, cfnames);
                 } catch (Exception e)
                 {
                     throw new RuntimeException("Error occurred during flushing", e);
@@ -1345,6 +1355,11 @@ public class NodeTool
         @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, description
= "Use -a to include all sstables, even those already on the current version")
         private boolean includeAll = false;
 
+        @Option(title = "jobs",
+                name = {"-j","--jobs"},
+                description = "Number of sstables to upgrade simultaneously, set to 0 to
use all available compaction threads")
+        private int jobs = 2;
+
         @Override
         public void execute(NodeProbe probe)
         {
@@ -1355,7 +1370,7 @@ public class NodeTool
             {
                 try
                 {
-                    probe.upgradeSSTables(System.out, keyspace, !includeAll, cfnames);
+                    probe.upgradeSSTables(System.out, keyspace, !includeAll, jobs, cfnames);
                 } catch (Exception e)
                 {
                     throw new RuntimeException("Error occurred during enabling auto-compaction",
e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/CleanupTest.java b/test/unit/org/apache/cassandra/db/CleanupTest.java
index 1d04dfa..7f54ed7 100644
--- a/test/unit/org/apache/cassandra/db/CleanupTest.java
+++ b/test/unit/org/apache/cassandra/db/CleanupTest.java
@@ -83,7 +83,7 @@ public class CleanupTest extends SchemaLoader
         assertEquals(LOOPS, rows.size());
 
         // with one token in the ring, owned by the local node, cleanup should be a no-op
-        CompactionManager.instance.performCleanup(cfs);
+        CompactionManager.instance.performCleanup(cfs, 2);
 
         // ensure max timestamp of the sstables are retained post-cleanup
         assert expectedMaxTimestamps.equals(getMaxTimestampList(cfs));
@@ -128,7 +128,7 @@ public class CleanupTest extends SchemaLoader
         tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
         tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
 
-        CompactionManager.instance.performCleanup(cfs);
+        CompactionManager.instance.performCleanup(cfs, 2);
 
         // row data should be gone
         rows = Util.getRangeSlice(cfs);
@@ -165,7 +165,7 @@ public class CleanupTest extends SchemaLoader
         tk2[0] = 1;
         tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1"));
         tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2"));
-        CompactionManager.instance.performCleanup(cfs);
+        CompactionManager.instance.performCleanup(cfs, 2);
 
         rows = Util.getRangeSlice(cfs);
         assertEquals(0, rows.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 167671b..4efd082 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -99,7 +99,7 @@ public class ScrubTest extends SchemaLoader
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
         assertEquals(1, rows.size());
 
-        CompactionManager.instance.performScrub(cfs, false, true);
+        CompactionManager.instance.performScrub(cfs, false, true, 2);
 
         // check data is still there
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -228,7 +228,7 @@ public class ScrubTest extends SchemaLoader
         SSTableReader sstable = cfs.getSSTables().iterator().next();
         overrideWithGarbage(sstable, 0, 2);
 
-        CompactionManager.instance.performScrub(cfs, false, true);
+        CompactionManager.instance.performScrub(cfs, false, true, 2);
 
         // check data is still there
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -249,7 +249,7 @@ public class ScrubTest extends SchemaLoader
         rm.applyUnsafe();
         cfs.forceBlockingFlush();
 
-        CompactionManager.instance.performScrub(cfs, false, true);
+        CompactionManager.instance.performScrub(cfs, false, true, 2);
         assert cfs.getSSTables().isEmpty();
     }
 
@@ -268,7 +268,7 @@ public class ScrubTest extends SchemaLoader
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
         assertEquals(10, rows.size());
 
-        CompactionManager.instance.performScrub(cfs, false, true);
+        CompactionManager.instance.performScrub(cfs, false, true, 2);
 
         // check data is still there
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -293,7 +293,7 @@ public class ScrubTest extends SchemaLoader
         for (SSTableReader sstable : cfs.getSSTables())
             new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)).delete();
 
-        CompactionManager.instance.performScrub(cfs, false, true);
+        CompactionManager.instance.performScrub(cfs, false, true, 2);
 
         // check data is still there
         rows = cfs.getRangeSlice(Util.range("", ""), null, new IdentityQueryFilter(), 1000);
@@ -506,7 +506,7 @@ public class ScrubTest extends SchemaLoader
 
         QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_static_columns
(a, b, c, d) VALUES (123, c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')");
         cfs.forceBlockingFlush();
-        CompactionManager.instance.performScrub(cfs, false, true);
+        CompactionManager.instance.performScrub(cfs, false, true, 2);
 
         QueryProcessor.process("CREATE TABLE \"Keyspace1\".test_scrub_validation (a text
primary key, b int)", ConsistencyLevel.ONE);
         ColumnFamilyStore cfs2 = keyspace.getColumnFamilyStore("test_scrub_validation");
@@ -516,7 +516,7 @@ public class ScrubTest extends SchemaLoader
         mutation.apply();
         cfs2.forceBlockingFlush();
 
-        CompactionManager.instance.performScrub(cfs2, false, false);
+        CompactionManager.instance.performScrub(cfs2, false, false, 2);
     }
 
     /**
@@ -533,7 +533,7 @@ public class ScrubTest extends SchemaLoader
         Mutation mutation = new Mutation("Keyspace1", ByteBufferUtil.bytes(UUIDGen.getTimeUUID()),
cf);
         mutation.applyUnsafe();
         cfs.forceBlockingFlush();
-        CompactionManager.instance.performScrub(cfs, false, true);
+        CompactionManager.instance.performScrub(cfs, false, true, 2);
 
         assertEquals(1, cfs.getSSTables().size());
     }
@@ -554,7 +554,7 @@ public class ScrubTest extends SchemaLoader
         QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns
(a, b, c) VALUES (0, 'b', 'bar')");
         QueryProcessor.executeInternal("INSERT INTO \"Keyspace1\".test_compact_dynamic_columns
(a, b, c) VALUES (0, 'c', 'boo')");
         cfs.forceBlockingFlush();
-        CompactionManager.instance.performScrub(cfs, true, true);
+        CompactionManager.instance.performScrub(cfs, true, true, 2);
 
         // Scrub is silent, but it will remove broken records. So reading everything back
to make sure nothing to "scrubbed away"
         UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM \"Keyspace1\".test_compact_dynamic_columns");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b8a3f5b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 7d33c11..749056c 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -328,7 +328,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         assertTrue(strategy.getAllLevelSize()[1] > 0);
 
         cfs.disableAutoCompaction();
-        cfs.sstablesRewrite(false);
+        cfs.sstablesRewrite(false, 2);
         assertTrue(strategy.getAllLevelSize()[1] > 0);
 
     }


Mime
View raw message