cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [1/3] cassandra git commit: Add ability to stop compaction by ID
Date Wed, 20 May 2015 15:34:44 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 f9b6d3dac -> 52dc63b6c
  refs/heads/trunk e194fe9d8 -> 2e48b6af9


Add ability to stop compaction by ID

patch by Lyuben Todorov; reviewed by yukim for CASSANDRA-7207


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/52dc63b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/52dc63b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/52dc63b6

Branch: refs/heads/cassandra-2.2
Commit: 52dc63b6c75ca3f68e4c90dc76f8e1cc65690fb5
Parents: f9b6d3d
Author: Lyuben Todorov <lyuben@mail.com>
Authored: Tue May 19 11:36:38 2015 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Wed May 20 10:30:16 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cache/AutoSavingCache.java |  4 +++-
 .../compaction/AbstractCompactionIterable.java  |  8 ++++++--
 .../cassandra/db/compaction/CompactionInfo.java | 20 +++++++++++++------
 .../db/compaction/CompactionIterable.java       |  9 +++++++--
 .../db/compaction/CompactionManager.java        | 21 +++++++++++++++++---
 .../db/compaction/CompactionManagerMBean.java   |  8 ++++++++
 .../cassandra/db/compaction/CompactionTask.java |  2 +-
 .../cassandra/db/compaction/Scrubber.java       |  6 +++++-
 .../cassandra/db/compaction/Upgrader.java       |  3 ++-
 .../cassandra/db/compaction/Verifier.java       |  6 +++++-
 .../db/index/SecondaryIndexBuilder.java         |  7 ++++++-
 .../org/apache/cassandra/tools/NodeProbe.java   |  5 +++++
 .../tools/nodetool/CompactionStats.java         |  9 +++++----
 .../apache/cassandra/tools/nodetool/Stop.java   | 17 ++++++++++++++--
 15 files changed, 101 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a227f5e..44f9e4c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 2.2
  * Ensure that UDF and UDAs are keyspace-isolated (CASSANDRA-9409)
  * Revert CASSANDRA-7807 (tracing completion client notifications) (CASSANDRA-9429)
+ * Add ability to stop compaction by ID (CASSANDRA-7207)
 Merged from 2.1:
  * Use configured gcgs in anticompaction (CASSANDRA-9397)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/cache/AutoSavingCache.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
index 7a9c3da..b381224 100644
--- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java
+++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K,
V>
 {
@@ -210,7 +211,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K
                                       type,
                                       0,
                                       keysEstimate,
-                                      "keys");
+                                      "keys",
+                                      UUIDGen.getTimeUUID());
         }
 
         public CacheService.CacheType cacheType()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index 5ac2c8b..9fe8fd9 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction;
 
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.cassandra.io.sstable.ISSTableScanner;
@@ -30,6 +31,7 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder
i
     protected final long totalBytes;
     protected volatile long bytesRead;
     protected final List<ISSTableScanner> scanners;
+    protected final UUID compactionId;
     /*
      * counters for merged rows.
      * array index represents (number of merged rows - 1), so index 0 is counter for no merge
(1 row),
@@ -37,12 +39,13 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder
i
      */
     protected final AtomicLong[] mergeCounters;
 
-    public AbstractCompactionIterable(CompactionController controller, OperationType type,
List<ISSTableScanner> scanners)
+    public AbstractCompactionIterable(CompactionController controller, OperationType type,
List<ISSTableScanner> scanners, UUID compactionId)
     {
         this.controller = controller;
         this.type = type;
         this.scanners = scanners;
         this.bytesRead = 0;
+        this.compactionId = compactionId;
 
         long bytes = 0;
         for (ISSTableScanner scanner : scanners)
@@ -58,7 +61,8 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder
i
         return new CompactionInfo(controller.cfs.metadata,
                                   type,
                                   bytesRead,
-                                  totalBytes);
+                                  totalBytes,
+                                  compactionId);
     }
 
     protected void updateCounterFor(int rows)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
index 3ee3a68..ff8c022 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionInfo.java
@@ -35,30 +35,32 @@ public final class CompactionInfo implements Serializable
     private final long completed;
     private final long total;
     private final String unit;
+    private final UUID compactionId;
 
-    public CompactionInfo(CFMetaData cfm, OperationType tasktype, long bytesComplete, long
totalBytes)
+    public CompactionInfo(CFMetaData cfm, OperationType tasktype, long bytesComplete, long
totalBytes, UUID compactionId)
     {
-        this(cfm, tasktype, bytesComplete, totalBytes, "bytes");
+        this(cfm, tasktype, bytesComplete, totalBytes, "bytes", compactionId);
     }
 
-    public CompactionInfo(OperationType tasktype, long completed, long total, String unit)
+    public CompactionInfo(OperationType tasktype, long completed, long total, String unit,
UUID compactionId)
     {
-        this(null, tasktype, completed, total, unit);
+        this(null, tasktype, completed, total, unit, compactionId);
     }
 
-    public CompactionInfo(CFMetaData cfm, OperationType tasktype, long completed, long total,
String unit)
+    public CompactionInfo(CFMetaData cfm, OperationType tasktype, long completed, long total,
String unit, UUID compactionId)
     {
         this.tasktype = tasktype;
         this.completed = completed;
         this.total = total;
         this.cfm = cfm;
         this.unit = unit;
+        this.compactionId = compactionId;
     }
 
     /** @return A copy of this CompactionInfo with updated progress. */
     public CompactionInfo forProgress(long complete, long total)
     {
-        return new CompactionInfo(cfm, tasktype, complete, total, unit);
+        return new CompactionInfo(cfm, tasktype, complete, total, unit, compactionId);
     }
 
     public UUID getId()
@@ -96,6 +98,11 @@ public final class CompactionInfo implements Serializable
         return tasktype;
     }
 
+    public UUID compactionId()
+    {
+        return compactionId;
+    }
+
     public String toString()
     {
         StringBuilder buff = new StringBuilder();
@@ -115,6 +122,7 @@ public final class CompactionInfo implements Serializable
         ret.put("total", Long.toString(total));
         ret.put("taskType", tasktype.toString());
         ret.put("unit", unit);
+        ret.put("compactionId", compactionId == null ? "" : compactionId.toString());
         return ret;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index cd08b81..23d8a4a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compaction;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.UUID;
 
 import com.google.common.collect.ImmutableList;
 
@@ -41,9 +42,13 @@ public class CompactionIterable extends AbstractCompactionIterable
         }
     };
 
-    public CompactionIterable(OperationType type, List<ISSTableScanner> scanners, CompactionController
controller, SSTableFormat.Type formatType)
+    public CompactionIterable(OperationType type,
+                              List<ISSTableScanner> scanners,
+                              CompactionController controller,
+                              SSTableFormat.Type formatType,
+                              UUID compactionId)
     {
-        super(controller, type, scanners);
+        super(controller, type, scanners, compactionId);
         this.format = formatType.info;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index cda6915..d79b835 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -77,6 +78,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.MerkleTree;
 import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Refs;
 
@@ -1184,7 +1186,7 @@ public class CompactionManager implements CompactionManagerMBean
             repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
             unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
 
-            CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION,
scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat());
+            CompactionIterable ci = new CompactionIterable(OperationType.ANTICOMPACTION,
scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
             Iterator<AbstractCompactedRow> iter = ci.iterator();
             metrics.beginCompaction(ci);
             try
@@ -1309,7 +1311,7 @@ public class CompactionManager implements CompactionManagerMBean
     {
         public ValidationCompactionIterable(ColumnFamilyStore cfs, List<ISSTableScanner>
scanners, int gcBefore)
         {
-            super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs,
gcBefore), DatabaseDescriptor.getSSTableFormat());
+            super(OperationType.VALIDATION, scanners, new ValidationCompactionController(cfs,
gcBefore), DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID());
         }
     }
 
@@ -1477,11 +1479,13 @@ public class CompactionManager implements CompactionManagerMBean
     {
         private final SSTableReader sstable;
         private final ISSTableScanner scanner;
+        private final UUID cleanupCompactionId;
 
         public CleanupInfo(SSTableReader sstable, ISSTableScanner scanner)
         {
             this.sstable = sstable;
             this.scanner = scanner;
+            cleanupCompactionId = UUIDGen.getTimeUUID();
         }
 
         public CompactionInfo getCompactionInfo()
@@ -1491,7 +1495,8 @@ public class CompactionManager implements CompactionManagerMBean
                 return new CompactionInfo(sstable.metadata,
                                           OperationType.CLEANUP,
                                           scanner.getCurrentPosition(),
-                                          scanner.getLengthInBytes());
+                                          scanner.getLengthInBytes(),
+                                          cleanupCompactionId);
             }
             catch (Exception e)
             {
@@ -1510,6 +1515,16 @@ public class CompactionManager implements CompactionManagerMBean
         }
     }
 
+    public void stopCompactionById(String compactionId)
+    {
+        for (Holder holder : CompactionMetrics.getCompactions())
+        {
+            UUID holderId = holder.getCompactionInfo().compactionId();
+            if (holderId != null && holderId.equals(UUID.fromString(compactionId)))
+                holder.stop();
+        }
+    }
+
     public int getCoreCompactorThreads()
     {
         return executor.getCorePoolSize();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
index 9c36192..8e200a1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManagerMBean.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.compaction;
 
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import javax.management.openmbean.TabularData;
 
 public interface CompactionManagerMBean
@@ -55,6 +56,13 @@ public interface CompactionManagerMBean
     public void stopCompaction(String type);
 
     /**
+     * Stop an individual running compaction using the compactionId.
+     * @param compactionId Compaction ID of compaction to stop. Such IDs can be found in
+     *                     the compactions_in_progress table of the system keyspace.
+     */
+    public void stopCompactionById(String compactionId);
+
+    /**
      * Returns core size of compaction thread pool
      */
     public int getCoreCompactorThreads();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index c397d9a..34f57c1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -162,7 +162,7 @@ public class CompactionTask extends AbstractCompactionTask
             // See CASSANDRA-8019 and CASSANDRA-8399
             try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact))
             {
-                ci = new CompactionIterable(compactionType, scanners.scanners, controller,
sstableFormat);
+                ci = new CompactionIterable(compactionType, scanners.scanners, controller,
sstableFormat, taskId);
                 Iterator<AbstractCompactedRow> iter = ci.iterator();
                 if (collector != null)
                     collector.beginCompaction(ci);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 310d58a..1e014ed 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -35,6 +35,7 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class Scrubber implements Closeable
 {
@@ -419,11 +420,13 @@ public class Scrubber implements Closeable
     {
         private final RandomAccessReader dataFile;
         private final SSTableReader sstable;
+        private final UUID scrubCompactionId;
 
         public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
         {
             this.dataFile = dataFile;
             this.sstable = sstable;
+            scrubCompactionId = UUIDGen.getTimeUUID();
         }
 
         public CompactionInfo getCompactionInfo()
@@ -433,7 +436,8 @@ public class Scrubber implements Closeable
                 return new CompactionInfo(sstable.metadata,
                                           OperationType.SCRUB,
                                           dataFile.getFilePointer(),
-                                          dataFile.length());
+                                          dataFile.length(),
+                                          scrubCompactionId);
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 30584fd..5bb1530 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class Upgrader
 {
@@ -85,7 +86,7 @@ public class Upgrader
         try (SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade),
true);
              AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
         {
-            Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType,
scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator();
+            Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType,
scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat(), UUIDGen.getTimeUUID()).iterator();
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/compaction/Verifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java
index 1d37c6f..0177819 100644
--- a/src/java/org/apache/cassandra/db/compaction/Verifier.java
+++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.utils.UUIDGen;
 
 import java.io.Closeable;
 import java.io.File;
@@ -241,11 +242,13 @@ public class Verifier implements Closeable
     {
         private final RandomAccessReader dataFile;
         private final SSTableReader sstable;
+        private final UUID verificationCompactionId;
 
         public VerifyInfo(RandomAccessReader dataFile, SSTableReader sstable)
         {
             this.dataFile = dataFile;
             this.sstable = sstable;
+            verificationCompactionId = UUIDGen.getTimeUUID();
         }
 
         public CompactionInfo getCompactionInfo()
@@ -255,7 +258,8 @@ public class Verifier implements Closeable
                 return new CompactionInfo(sstable.metadata,
                                           OperationType.VERIFY,
                                           dataFile.getFilePointer(),
-                                          dataFile.length());
+                                          dataFile.length(),
+                                          verificationCompactionId);
             }
             catch (Exception e)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
index eb09e43..916c286 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexBuilder.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.index;
 
 import java.io.IOException;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
@@ -27,6 +28,7 @@ import org.apache.cassandra.db.compaction.CompactionInfo;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.compaction.CompactionInterruptedException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
+import org.apache.cassandra.utils.UUIDGen;
 
 /**
  * Manages building an entire index from column family data. Runs on to compaction manager.
@@ -36,12 +38,14 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
     private final ColumnFamilyStore cfs;
     private final Set<String> idxNames;
     private final ReducingKeyIterator iter;
+    private final UUID compactionId;
 
     public SecondaryIndexBuilder(ColumnFamilyStore cfs, Set<String> idxNames, ReducingKeyIterator
iter)
     {
         this.cfs = cfs;
         this.idxNames = idxNames;
         this.iter = iter;
+        compactionId = UUIDGen.getTimeUUID();
     }
 
     public CompactionInfo getCompactionInfo()
@@ -49,7 +53,8 @@ public class SecondaryIndexBuilder extends CompactionInfo.Holder
         return new CompactionInfo(cfs.metadata,
                                   OperationType.INDEX_BUILD,
                                   iter.getBytesRead(),
-                                  iter.getTotalBytes());
+                                  iter.getTotalBytes(),
+                                  compactionId);
     }
 
     public void build()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 1341c68..f10a4b6 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -957,6 +957,11 @@ public class NodeProbe implements AutoCloseable
         compactionProxy.stopCompaction(string);
     }
 
+    public void stopById(String compactionId)
+    {
+        compactionProxy.stopCompactionById(compactionId);
+    }
+
     public void setStreamThroughput(int value)
     {
         ssProxy.setStreamThroughputMbPerSec(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
index 154ef49..e57d2ee 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CompactionStats.java
@@ -52,9 +52,9 @@ public class CompactionStats extends NodeToolCmd
         {
             int compactionThroughput = probe.getCompactionThroughput();
             List<String[]> lines = new ArrayList<>();
-            int[] columnSizes = new int[] { 0, 0, 0, 0, 0, 0, 0 };
+            int[] columnSizes = new int[] { 0, 0, 0, 0, 0, 0, 0, 0 };
 
-            addLine(lines, columnSizes, "compaction type", "keyspace", "table", "completed",
"total", "unit", "progress");
+            addLine(lines, columnSizes, "id", "compaction type", "keyspace", "table", "completed",
"total", "unit", "progress");
             for (Map<String, String> c : compactions)
             {
                 long total = Long.parseLong(c.get("total"));
@@ -66,7 +66,8 @@ public class CompactionStats extends NodeToolCmd
                 String totalStr = humanReadable ? FileUtils.stringifyFileSize(total) : Long.toString(total);
                 String unit = c.get("unit");
                 String percentComplete = total == 0 ? "n/a" : new DecimalFormat("0.00").format((double)
completed / total * 100) + "%";
-                addLine(lines, columnSizes, taskType, keyspace, columnFamily, completedStr,
totalStr, unit, percentComplete);
+                String id = c.get("compactionId");
+                addLine(lines, columnSizes, id, taskType, keyspace, columnFamily, completedStr,
totalStr, unit, percentComplete);
                 if (taskType.equals(OperationType.COMPACTION.toString()))
                     remainingBytes += total - completed;
             }
@@ -82,7 +83,7 @@ public class CompactionStats extends NodeToolCmd
 
             for (String[] line : lines)
             {
-                System.out.printf(format, line[0], line[1], line[2], line[3], line[4], line[5],
line[6]);
+                System.out.printf(format, line[0], line[1], line[2], line[3], line[4], line[5],
line[6], line[7]);
             }
 
             String remainingTime = "n/a";

http://git-wip-us.apache.org/repos/asf/cassandra/blob/52dc63b6/src/java/org/apache/cassandra/tools/nodetool/Stop.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Stop.java b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
index b3bb2b8..ad1fc27 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Stop.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Stop.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.tools.nodetool;
 
 import io.airlift.command.Arguments;
 import io.airlift.command.Command;
+import io.airlift.command.Option;
 
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.tools.NodeProbe;
@@ -27,12 +28,24 @@ import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 @Command(name = "stop", description = "Stop compaction")
 public class Stop extends NodeToolCmd
 {
-    @Arguments(title = "compaction_type", usage = "<compaction type>", description
= "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, VERIFY, INDEX_BUILD", required
= true)
+    @Arguments(title = "compaction_type",
+              usage = "<compaction type>",
+              description = "Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB,
VERIFY, INDEX_BUILD",
+              required = false)
     private OperationType compactionType = OperationType.UNKNOWN;
 
+    @Option(title = "compactionId",
+           name = {"-id", "--compaction-id"},
+           description = "Use -id to stop a compaction by the specified id. Ids can be found
in the system.compactions_in_progress table.",
+           required = false)
+    private String compactionId = "";
+
     @Override
     public void execute(NodeProbe probe)
     {
-        probe.stop(compactionType.name());
+        if (!compactionId.isEmpty())
+            probe.stopById(compactionId);
+        else
+            probe.stop(compactionType.name());
     }
 }
\ No newline at end of file


Mime
View raw message