cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [2/3] git commit: Snapshot only related SSTables when sequential repair
Date Tue, 15 Apr 2014 22:34:04 GMT
Snapshot only related SSTables when sequential repair

patch by yukim; reviewed by jmckenzie for CASSANDRA-7024


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/de8a479f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/de8a479f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/de8a479f

Branch: refs/heads/trunk
Commit: de8a479f2e1a8b536dedf2e6470301709bc3d9dc
Parents: b69f5e3
Author: Yuki Morishita <yukim@apache.org>
Authored: Tue Apr 15 17:13:45 2014 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Tue Apr 15 17:13:45 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 18 ++++++-
 .../repair/RepairMessageVerbHandler.java        | 33 +++++++++---
 .../apache/cassandra/repair/SnapshotTask.java   |  8 +--
 .../repair/messages/RepairMessage.java          |  3 +-
 .../repair/messages/SnapshotMessage.java        | 53 ++++++++++++++++++++
 6 files changed, 100 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 592eef9..9f34023 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -45,6 +45,7 @@
  * Add failure handler to async callback (CASSANDRA-6747)
  * Fix AE when closing SSTable without releasing reference (CASSANDRA-7000)
  * Clean up IndexInfo on keyspace/table drops (CASSANDRA-6924)
+ * Only snapshot relative SSTables when sequential repair (CASSANDRA-7024)
 Merged from 2.0:
  * Put nodes in hibernate when join_ring is false (CASSANDRA-6961)
  * Allow compaction of system tables during startup (CASSANDRA-6913)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index ffea243..923ea5b 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -30,6 +30,7 @@ import javax.management.*;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Predicate;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 import com.google.common.util.concurrent.Futures;
@@ -2153,6 +2154,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
     public void snapshotWithoutFlush(String snapshotName)
     {
+        snapshotWithoutFlush(snapshotName, null);
+    }
+
+    public void snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader>
predicate)
+    {
         for (ColumnFamilyStore cfs : concatWithIndexes())
         {
             DataTracker.View currentView = cfs.markCurrentViewReferenced();
@@ -2161,6 +2167,11 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             {
                 for (SSTableReader ssTable : currentView.sstables)
                 {
+                    if (predicate != null && !predicate.apply(ssTable))
+                    {
+                        continue;
+                    }
+
                     File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor,
snapshotName);
                     ssTable.createLinks(snapshotDirectory.getPath()); // hard links
                     if (logger.isDebugEnabled())
@@ -2190,8 +2201,13 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      */
     public void snapshot(String snapshotName)
     {
+        snapshot(snapshotName, null);
+    }
+
+    public void snapshot(String snapshotName, Predicate<SSTableReader> predicate)
+    {
         forceBlockingFlush();
-        snapshotWithoutFlush(snapshotName);
+        snapshotWithoutFlush(snapshotName, predicate);
     }
 
     public boolean snapshotExists(String snapshotName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index bb66b69..d710652 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -18,30 +18,32 @@
 package org.apache.cassandra.repair;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Future;
 
+import com.google.common.base.Predicate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.repair.messages.AnticompactionRequest;
-import org.apache.cassandra.repair.messages.PrepareMessage;
-import org.apache.cassandra.repair.messages.RepairMessage;
-import org.apache.cassandra.repair.messages.SyncRequest;
-import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * Handles all repair related message.
  *
@@ -71,6 +73,21 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                 MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE),
id, message.from);
                 break;
 
+            case SNAPSHOT:
+                ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+                final Range<Token> repairingRange = desc.range;
+                cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
+                {
+                    public boolean apply(SSTableReader sstable)
+                    {
+                        return sstable != null && new Bounds<>(sstable.first.token,
sstable.last.token).intersects(Collections.singleton(repairingRange));
+                    }
+                });
+
+                logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId,
message.from);
+                MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE),
id, message.from);
+                break;
+
             case VALIDATION_REQUEST:
                 ValidationRequest validationRequest = (ValidationRequest) message.payload;
                 // trigger read-only compaction

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/repair/SnapshotTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java
index cb5003a..6c3afb1 100644
--- a/src/java/org/apache/cassandra/repair/SnapshotTask.java
+++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java
@@ -18,15 +18,14 @@
 package org.apache.cassandra.repair;
 
 import java.net.InetAddress;
-import java.util.List;
 import java.util.concurrent.RunnableFuture;
 
 import com.google.common.util.concurrent.AbstractFuture;
 
-import org.apache.cassandra.db.SnapshotCommand;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.SnapshotMessage;
 
 /**
  * SnapshotTask is a task that sends snapshot request.
@@ -44,10 +43,7 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements
Runnabl
 
     public void run()
     {
-        MessagingService.instance().sendRRWithFailure(new SnapshotCommand(desc.keyspace,
-                desc.columnFamily,
-                desc.sessionId.toString(),
-                false).createMessage(),
+        MessagingService.instance().sendRRWithFailure(new SnapshotMessage(desc).createMessage(),
                 endpoint,
                 new SnapshotCallback(this));
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
index 054fb55..d500928 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairMessage.java
@@ -44,7 +44,8 @@ public abstract class RepairMessage
         SYNC_REQUEST(2, SyncRequest.serializer),
         SYNC_COMPLETE(3, SyncComplete.serializer),
         ANTICOMPACTION_REQUEST(4, AnticompactionRequest.serializer),
-        PREPARE_MESSAGE(5, PrepareMessage.serializer);
+        PREPARE_MESSAGE(5, PrepareMessage.serializer),
+        SNAPSHOT(6, SnapshotMessage.serializer);
 
         private final byte type;
         private final MessageSerializer<RepairMessage> serializer;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/de8a479f/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
new file mode 100644
index 0000000..caccc82
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/messages/SnapshotMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.messages;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.repair.RepairJobDesc;
+
+public class SnapshotMessage extends RepairMessage
+{
+    public final static MessageSerializer serializer = new SnapshotMessageSerializer();
+
+    public SnapshotMessage(RepairJobDesc desc)
+    {
+        super(Type.SNAPSHOT, desc);
+    }
+
+    public static class SnapshotMessageSerializer implements MessageSerializer<SnapshotMessage>
+    {
+        public void serialize(SnapshotMessage message, DataOutputPlus out, int version) throws
IOException
+        {
+            RepairJobDesc.serializer.serialize(message.desc, out, version);
+        }
+
+        public SnapshotMessage deserialize(DataInput in, int version) throws IOException
+        {
+            RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version);
+            return new SnapshotMessage(desc);
+        }
+
+        public long serializedSize(SnapshotMessage message, int version)
+        {
+            return RepairJobDesc.serializer.serializedSize(message.desc, version);
+        }
+    }
+}


Mime
View raw message