cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From slebre...@apache.org
Subject [25/50] git commit: Staggering repair patch by Vijay and Sylvain Lebresne; reviewed by Sylvain Lebresne for CASSANDRA-3721
Date Fri, 24 Feb 2012 15:49:23 GMT
Staggering repair
patch by Vijay and Sylvain Lebresne; reviewed by Sylvain Lebresne for CASSANDRA-3721


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ddee43e8
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ddee43e8
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ddee43e8

Branch: refs/heads/cassandra-1.1
Commit: ddee43e8463777b0419fac2423a59511202c8fab
Parents: 6642d0f
Author: Vijay Parthasarathy <vijay2win@gmail.com>
Authored: Wed Feb 15 01:48:32 2012 -0800
Committer: Vijay Parthasarathy <vijay2win@gmail.com>
Committed: Wed Feb 15 01:48:32 2012 -0800

----------------------------------------------------------------------
 .../org/apache/cassandra/db/ColumnFamilyStore.java |    8 +
 src/java/org/apache/cassandra/db/Directories.java  |   15 +
 .../cassandra/db/compaction/CompactionManager.java |   37 ++-
 .../cassandra/service/AntiEntropyService.java      |  223 ++++++++++++--
 .../apache/cassandra/service/StorageService.java   |   13 +-
 .../cassandra/service/StorageServiceMBean.java     |    4 +-
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    7 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |    8 +-
 .../apache/cassandra/io/CompactSerializerTest.java |    1 +
 9 files changed, 257 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddee43e8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index e4e3204..218fadf 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1423,6 +1423,14 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
+    public List<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException
+    {
+        List<SSTableReader> readers = new ArrayList<SSTableReader>();
+        for (Map.Entry<Descriptor, Set<Component>> entries : directories.sstableLister().snapshots(tag).list().entrySet())
+            readers.add(SSTableReader.open(entries.getKey(), entries.getValue(), metadata,
partitioner));
+        return readers;
+    }
+
     /**
      * Take a snap shot of this columnfamily store.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddee43e8/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 2afefd2..7c51830 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -185,6 +185,7 @@ public class Directories
         private int nbFiles;
         private final Map<Descriptor, Set<Component>> components = new HashMap<Descriptor,
Set<Component>>();
         private boolean filtered;
+        private String snapshotName;
 
         public SSTableLister skipCompacted(boolean b)
         {
@@ -219,6 +220,14 @@ public class Directories
             return this;
         }
 
+        public SSTableLister snapshots(String sn)
+        {
+            if (filtered)
+                throw new IllegalStateException("list() has already been called");
+            snapshotName = sn;
+            return this;
+        }
+
         public Map<Descriptor, Set<Component>> list()
         {
             filter();
@@ -246,6 +255,12 @@ public class Directories
 
             for (File location : sstableDirectories)
             {
+                if (snapshotName != null)
+                {
+                    new File(location, join(SNAPSHOT_SUBDIR, snapshotName)).listFiles(getFilter());
+                    continue;
+                }
+
                 if (!onlyBackups)
                     location.listFiles(getFilter());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddee43e8/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f30510b..ed699f2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -807,23 +807,33 @@ public class CompactionManager implements CompactionManagerMBean
         if (!cfs.isValid())
             return;
 
-        // flush first so everyone is validating data that is as similar as possible
-        try
-        {
-            StorageService.instance.forceTableFlush(cfs.table.name, cfs.getColumnFamilyName());
-        }
-        catch (ExecutionException e)
+        Collection<SSTableReader> sstables;
+        if (cfs.table.snapshotExists(validator.request.sessionid))
         {
-            throw new IOException(e);
+            // If there is a snapshot created for the session then read from there.
+            sstables = cfs.getSnapshotSSTableReader(validator.request.sessionid);
         }
-        catch (InterruptedException e)
+        else
         {
-            throw new AssertionError(e);
+            // flush first so everyone is validating data that is as similar as possible
+            try
+            {
+                StorageService.instance.forceTableFlush(cfs.table.name, cfs.getColumnFamilyName());
+            }
+            catch (ExecutionException e)
+            {
+                throw new IOException(e);
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            // we don't mark validating sstables as compacting in DataTracker, so we have
to mark them referenced
+            // instead so they won't be cleaned up if they do get compacted during the validation
+            sstables = cfs.markCurrentSSTablesReferenced();
         }
 
-        // we don't mark validating sstables as compacting in DataTracker, so we have to
mark them referenced
-        // instead so they won't be cleaned up if they do get compacted during the validation
-        Collection<SSTableReader> sstables = cfs.markCurrentSSTablesReferenced();
         CompactionIterable ci = new ValidationCompactionIterable(cfs, sstables, validator.request.range);
         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
         validationExecutor.beginCompaction(ci);
@@ -846,6 +856,9 @@ public class CompactionManager implements CompactionManagerMBean
         {
             SSTableReader.releaseReferences(sstables);
             iter.close();
+            if (cfs.table.snapshotExists(validator.request.sessionid))
+                cfs.table.clearSnapshot(validator.request.sessionid);
+
             validationExecutor.finishCompaction(ci);
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddee43e8/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java
index 812c23a..20c0e8b 100644
--- a/src/java/org/apache/cassandra/service/AntiEntropyService.java
+++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.SnapshotCommand;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Range;
@@ -47,6 +48,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
 import org.apache.cassandra.net.CompactEndpointSerializationHelper;
+import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -119,9 +121,9 @@ public class AntiEntropyService
     /**
      * Requests repairs for the given table and column families, and blocks until all repairs
have been completed.
      */
-    public RepairFuture submitRepairSession(Range<Token> range, String tablename, String...
cfnames)
+    public RepairFuture submitRepairSession(Range<Token> range, String tablename, boolean
isSequential, String... cfnames)
     {
-        RepairFuture futureTask = new RepairSession(range, tablename, cfnames).getFuture();
+        RepairFuture futureTask = new RepairSession(range, tablename, isSequential, cfnames).getFuture();
         executor.execute(futureTask);
         return futureTask;
     }
@@ -209,16 +211,6 @@ public class AntiEntropyService
     }
 
     /**
-     * Requests a tree from the given node, and returns the request that was sent.
-     */
-    TreeRequest request(String sessionid, InetAddress remote, Range<Token> range, String
ksname, String cfname)
-    {
-        TreeRequest request = new TreeRequest(sessionid, remote, range, new CFPair(ksname,
cfname));
-        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(request, Gossiper.instance.getVersion(remote)),
remote);
-        return request;
-    }
-
-    /**
      * Responds to the node that requested the given valid tree.
      * @param validator A locally generated validator
      * @param local localhost (parameterized for testing)
@@ -598,6 +590,7 @@ public class AntiEntropyService
     static class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener
     {
         private final String sessionName;
+        private final boolean isSequential;
         private final String tablename;
         private final String[] cfnames;
         private final Range<Token> range;
@@ -615,18 +608,19 @@ public class AntiEntropyService
 
         public RepairSession(TreeRequest req, String tablename, String... cfnames)
         {
-            this(req.sessionid, req.range, tablename, cfnames);
+            this(req.sessionid, req.range, tablename, false, cfnames);
             AntiEntropyService.instance.sessions.put(getName(), this);
         }
 
-        public RepairSession(Range<Token> range, String tablename, String... cfnames)
+        public RepairSession(Range<Token> range, String tablename, boolean isSequential,
String... cfnames)
         {
-            this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(),
range, tablename, cfnames);
+            this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(),
range, tablename, isSequential, cfnames);
         }
 
-        private RepairSession(String id, Range<Token> range, String tablename, String[]
cfnames)
+        private RepairSession(String id, Range<Token> range, String tablename, boolean
isSequential, String[] cfnames)
         {
             this.sessionName = id;
+            this.isSequential = isSequential;
             this.tablename = tablename;
             this.cfnames = cfnames;
             assert cfnames.length > 0 : "Repairing no column families seems pointless,
doesn't it";
@@ -674,6 +668,12 @@ public class AntiEntropyService
                     logger.info(String.format("[repair #%s] Cannot proceed on repair because
a neighbor (%s) is dead: session failed", getName(), endpoint));
                     return;
                 }
+
+                if (Gossiper.instance.getVersion(endpoint) < MessagingService.VERSION_11
&& isSequential)
+                {
+                    logger.info(String.format("[repair #%s] Cannot repair using snapshots
as node %s is pre-1.1", getName(), endpoint));
+                    return;
+                }
             }
 
             AntiEntropyService.instance.sessions.put(getName(), this);
@@ -729,6 +729,8 @@ public class AntiEntropyService
         public void terminate()
         {
             terminated = true;
+            for (RepairJob job : jobs)
+                job.terminate();
             jobs.clear();
             activeJobs.clear();
         }
@@ -810,17 +812,32 @@ public class AntiEntropyService
         {
             private final String cfname;
             // first we send tree requests.  this tracks the endpoints remaining to hear
from
-            private final Set<InetAddress> remainingEndpoints = new HashSet<InetAddress>();
+            private final RequestCoordinator<TreeRequest> treeRequests;
             // tree responses are then tracked here
             private final List<TreeResponse> trees = new ArrayList<TreeResponse>(endpoints.size()
+ 1);
             // once all responses are received, each tree is compared with each other, and
differencer tasks
             // are submitted.  the job is done when all differencers are complete.
-            private final Set<Differencer> remainingDifferencers = new HashSet<Differencer>();
+            private final RequestCoordinator<Differencer> differencers;
             private final Condition requestsSent = new SimpleCondition();
+            private CountDownLatch snapshotLatch = null;
 
             public RepairJob(String cfname)
             {
                 this.cfname = cfname;
+                this.treeRequests = new RequestCoordinator<TreeRequest>(isSequential)
+                {
+                    public void send(TreeRequest r)
+                    {
+                        MessagingService.instance().sendOneWay(TreeRequestVerbHandler.makeVerb(r,
Gossiper.instance.getVersion(r.endpoint)), r.endpoint);
+                    }
+                };
+                this.differencers = new RequestCoordinator<Differencer>(isSequential)
+                {
+                    public void send(Differencer d)
+                    {
+                        StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
+                    }
+                };
             }
 
             /**
@@ -828,17 +845,51 @@ public class AntiEntropyService
              */
             public void sendTreeRequests()
             {
-                remainingEndpoints.addAll(endpoints);
-                remainingEndpoints.add(FBUtilities.getBroadcastAddress());
-
                 // send requests to all nodes
-                for (InetAddress endpoint : remainingEndpoints)
-                    AntiEntropyService.instance.request(getName(), endpoint, range, tablename,
cfname);
+                List<InetAddress> allEndpoints = new ArrayList<InetAddress>(endpoints);
+                allEndpoints.add(FBUtilities.getBroadcastAddress());
 
-                logger.info(String.format("[repair #%s] requests for merkle tree sent for
%s (to %s)", getName(), cfname, remainingEndpoints));
+                if (isSequential)
+                    makeSnapshots(endpoints);
+
+                for (InetAddress endpoint : allEndpoints)
+                    treeRequests.add(new TreeRequest(getName(), endpoint, range, new CFPair(tablename,
cfname)));
+
+                logger.info(String.format("[repair #%s] requesting merkle trees for %s (to
%s)", getName(), cfname, allEndpoints));
+                treeRequests.start();
                 requestsSent.signalAll();
             }
 
+            public void makeSnapshots(Collection<InetAddress> endpoints)
+            {
+                try
+                {
+                    snapshotLatch = new CountDownLatch(endpoints.size());
+                    IAsyncCallback callback = new IAsyncCallback()
+                    {
+                        @Override
+                            public boolean isLatencyForSnitch()
+                            {
+                                return false;
+                            }
+
+                        @Override
+                            public void response(Message msg)
+                            {
+                                RepairJob.this.snapshotLatch.countDown();
+                            }
+                    };
+                    for (InetAddress endpoint : endpoints)
+                        MessagingService.instance().sendRR(new SnapshotCommand(tablename,
cfname, sessionName, false), endpoint, callback);
+                    snapshotLatch.await();
+                    snapshotLatch = null;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
             /**
              * Add a new received tree and return the number of remaining tree to
              * be received for the job to be complete.
@@ -859,8 +910,7 @@ public class AntiEntropyService
 
                 assert request.cf.right.equals(cfname);
                 trees.add(new TreeResponse(request.endpoint, tree));
-                remainingEndpoints.remove(request.endpoint);
-                return remainingEndpoints.size();
+                return treeRequests.completed(request);
             }
 
             /**
@@ -869,8 +919,6 @@ public class AntiEntropyService
              */
             public void submitDifferencers()
             {
-                assert remainingEndpoints.isEmpty();
-
                 // We need to difference all trees one against another
                 for (int i = 0; i < trees.size() - 1; ++i)
                 {
@@ -880,10 +928,10 @@ public class AntiEntropyService
                         TreeResponse r2 = trees.get(j);
                         Differencer differencer = new Differencer(cfname, r1, r2);
                         logger.debug("Queueing comparison {}", differencer);
-                        remainingDifferencers.add(differencer);
-                        StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer);
+                        differencers.add(differencer);
                     }
                 }
+                differencers.start();
                 trees.clear(); // allows gc to do its thing
             }
 
@@ -892,8 +940,16 @@ public class AntiEntropyService
              */
             synchronized boolean completedSynchronization(Differencer differencer)
             {
-                remainingDifferencers.remove(differencer);
-                return remainingDifferencers.isEmpty();
+                return differencers.completed(differencer) == 0;
+            }
+
+            public void terminate()
+            {
+                if (snapshotLatch != null)
+                {
+                    while (snapshotLatch.getCount() > 0)
+                        snapshotLatch.countDown();
+                }
             }
         }
 
@@ -992,4 +1048,107 @@ public class AntiEntropyService
             this.session = session;
         }
     }
+
+    public static abstract class RequestCoordinator<R>
+    {
+        private final Order<R> orderer;
+
+        protected RequestCoordinator(boolean isSequential)
+        {
+            this.orderer = isSequential ? new SequentialOrder(this) : new ParallelOrder(this);
+        }
+
+        public abstract void send(R request);
+
+        public void add(R request)
+        {
+            orderer.add(request);
+        }
+
+        public void start()
+        {
+            orderer.start();
+        }
+
+        // Returns how many request remains
+        public int completed(R request)
+        {
+            return orderer.completed(request);
+        }
+
+        private static abstract class Order<R>
+        {
+            protected final RequestCoordinator<R> coordinator;
+
+            Order(RequestCoordinator<R> coordinator)
+            {
+                this.coordinator = coordinator;
+            }
+
+            public abstract void add(R request);
+            public abstract void start();
+            public abstract int completed(R request);
+        }
+
+        private static class SequentialOrder<R> extends Order<R>
+        {
+            private final Queue<R> requests = new LinkedList<R>();
+
+            SequentialOrder(RequestCoordinator<R> coordinator)
+            {
+                super(coordinator);
+            }
+
+            public void add(R request)
+            {
+                requests.add(request);
+            }
+
+            public void start()
+            {
+                if (requests.isEmpty())
+                    return;
+
+                coordinator.send(requests.peek());
+            }
+
+            public int completed(R request)
+            {
+                assert request.equals(requests.peek());
+                requests.poll();
+                int remaining = requests.size();
+                if (remaining != 0)
+                    coordinator.send(requests.peek());
+                return remaining;
+            }
+        }
+
+        private static class ParallelOrder<R> extends Order<R>
+        {
+            private final Set<R> requests = new HashSet<R>();
+
+            ParallelOrder(RequestCoordinator<R> coordinator)
+            {
+                super(coordinator);
+            }
+
+            public void add(R request)
+            {
+                requests.add(request);
+            }
+
+            public void start()
+            {
+                for (R request : requests)
+                    coordinator.send(request);
+            }
+
+            public int completed(R request)
+            {
+                requests.remove(request);
+                return requests.size();
+            }
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddee43e8/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d06b4a2..ed33b44 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1826,12 +1826,11 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceTableRepair(final String tableName, final String... columnFamilies)
throws IOException
+    public void forceTableRepair(final String tableName, boolean isSequential, final String...
columnFamilies) throws IOException
     {
         if (Table.SYSTEM_TABLE.equals(tableName))
             return;
 
-
         Collection<Range<Token>> ranges = getLocalRanges(tableName);
         int cmd = nextRepairCommand.incrementAndGet();
         logger_.info("Starting repair command #{}, repairing {} ranges.", cmd, ranges.size());
@@ -1839,7 +1838,7 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
         List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
         for (Range<Token> range : ranges)
         {
-            AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, columnFamilies);
+            AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential,
columnFamilies);
             futures.add(future);
             // wait for a session to be done with its differencing before starting the next
one
             try
@@ -1874,12 +1873,12 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
             logger_.info("Repair command #{} completed successfully", cmd);
     }
 
-    public void forceTableRepairPrimaryRange(final String tableName, final String... columnFamilies)
throws IOException
+    public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential,
final String... columnFamilies) throws IOException
     {
         if (Table.SYSTEM_TABLE.equals(tableName))
             return;
 
-        AntiEntropyService.RepairFuture future = forceTableRepair(getLocalPrimaryRange(),
tableName, columnFamilies);
+        AntiEntropyService.RepairFuture future = forceTableRepair(getLocalPrimaryRange(),
tableName, isSequential, columnFamilies);
         try
         {
             future.get();
@@ -1891,7 +1890,7 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
         }
     }
 
-    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range,
final String tableName, final String... columnFamilies) throws IOException
+    public AntiEntropyService.RepairFuture forceTableRepair(final Range<Token> range,
final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
     {
         ArrayList<String> names = new ArrayList<String>();
         for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName, columnFamilies))
@@ -1899,7 +1898,7 @@ public class StorageService implements IEndpointStateChangeSubscriber,
StorageSe
             names.add(cfStore.getColumnFamilyName());
         }
 
-        return AntiEntropyService.instance.submitRepairSession(range, tableName, names.toArray(new
String[names.size()]));
+        return AntiEntropyService.instance.submitRepairSession(range, tableName, isSequential,
names.toArray(new String[names.size()]));
     }
 
     public void forceTerminateAllRepairSessions() {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddee43e8/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 6af63b7..c5aa9fd 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -231,12 +231,12 @@ public interface StorageServiceMBean
      * @param columnFamilies
      * @throws IOException
      */
-    public void forceTableRepair(String tableName, String... columnFamilies) throws IOException;
+    public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies)
throws IOException;
 
     /**
      * Triggers proactive repair but only for the node primary range.
      */
-    public void forceTableRepairPrimaryRange(String tableName, String... columnFamilies)
throws IOException;
+    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String...
columnFamilies) throws IOException;
 
     public void forceTerminateAllRepairSessions();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddee43e8/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index d1648a1..e5ed0b7 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -53,6 +53,7 @@ public class NodeCmd
     private static final Pair<String, String> PASSWORD_OPT = new Pair<String, String>("pw",
"password");
     private static final Pair<String, String> TAG_OPT = new Pair<String, String>("t",
"tag");
     private static final Pair<String, String> PRIMARY_RANGE_OPT = new Pair<String,
String>("pr", "partitioner-range");
+    private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = new Pair<String,
String>("snapshot", "with-snapshot");
 
     private static final String DEFAULT_HOST = "127.0.0.1";
     private static final int DEFAULT_PORT = 7199;
@@ -71,6 +72,7 @@ public class NodeCmd
         options.addOption(PASSWORD_OPT, true, "remote jmx agent password");
         options.addOption(TAG_OPT,      true, "optional name to give a snapshot");
         options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first range returned
by the partitioner for the node");
+        options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a time using snapshots");
     }
     
     public NodeCmd(NodeProbe probe)
@@ -921,10 +923,11 @@ public class NodeCmd
             switch (nc)
             {
                 case REPAIR  :
+                    boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
                     if (cmd.hasOption(PRIMARY_RANGE_OPT.left))
-                        probe.forceTableRepairPrimaryRange(keyspace, columnFamilies);
+                        probe.forceTableRepairPrimaryRange(keyspace, snapshot, columnFamilies);
                     else
-                        probe.forceTableRepair(keyspace, columnFamilies);
+                        probe.forceTableRepair(keyspace, snapshot, columnFamilies);
                     break;
                 case FLUSH   :
                     try { probe.forceTableFlush(keyspace, columnFamilies); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddee43e8/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 46d4c63..8739745 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -202,14 +202,14 @@ public class NodeProbe
         ssProxy.forceTableFlush(tableName, columnFamilies);
     }
 
-    public void forceTableRepair(String tableName, String... columnFamilies) throws IOException
+    public void forceTableRepair(String tableName, boolean isSequential, String... columnFamilies)
throws IOException
     {
-        ssProxy.forceTableRepair(tableName, columnFamilies);
+        ssProxy.forceTableRepair(tableName, isSequential, columnFamilies);
     }
 
-    public void forceTableRepairPrimaryRange(String tableName, String... columnFamilies)
throws IOException
+    public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String...
columnFamilies) throws IOException
     {
-        ssProxy.forceTableRepairPrimaryRange(tableName, columnFamilies);
+        ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies);
     }
 
     public void invalidateKeyCache() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ddee43e8/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/CompactSerializerTest.java b/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
index e8e2068..befb0d1 100644
--- a/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/CompactSerializerTest.java
@@ -70,6 +70,7 @@ public class CompactSerializerTest extends CleanupHelper
         expectedClassNames.add("HashableSerializer");
         expectedClassNames.add("StreamingRepairTaskSerializer");
         expectedClassNames.add("AbstractBoundsSerializer");
+        expectedClassNames.add("SnapshotCommandSerializer");
         
         discoveredClassNames = new ArrayList<String>();
         String cp = System.getProperty("java.class.path");


Mime
View raw message