cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [1/3] cassandra git commit: Wait for anticompaction to finish
Date Fri, 08 May 2015 22:54:09 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 def6b5fa1 -> 93478ab46
  refs/heads/trunk 40a7e8606 -> a583f70ee


Wait for anticompaction to finish

patch by yukim; reviewed by marcuse for CASSANDRA-9097


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/93478ab4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/93478ab4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/93478ab4

Branch: refs/heads/cassandra-2.1
Commit: 93478ab46c88d6fd198db67ed3ba25251cc30c8c
Parents: def6b5f
Author: Yuki Morishita <yukim@apache.org>
Authored: Fri May 8 13:10:06 2015 -0500
Committer: Yuki Morishita <yukim@apache.org>
Committed: Fri May 8 17:25:44 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java | 31 +++++++
 .../cassandra/repair/AnticompactionTask.java    | 93 ++++++++++++++++++++
 .../repair/RepairMessageVerbHandler.java        | 14 ++-
 .../cassandra/service/ActiveRepairService.java  | 14 +--
 .../cassandra/service/StorageService.java       | 20 +++--
 .../apache/cassandra/utils/SemanticVersion.java |  2 +-
 7 files changed, 161 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a9b34a..5b7843a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -16,6 +16,7 @@
    is modified (CASSANDRA-9148, CASSANDRA-9192)
  * Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261)
  * Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151)
+ * Repair waits for anticompaction to finish (CASSANDRA-9097)
 Merged from 2.0:
  * Include keyspace and table name in error log for collections over the size
    limit (CASSANDRA-9286)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 20d5387..5beb709 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -546,6 +546,37 @@ public class SystemKeyspace
     }
 
     /**
+     * Get release version for given endpoint.
+     * If release version is unknown, then this returns null.
+     *
+     * @param ep endpoint address to check
+     * @return Release version or null if version is unknown.
+     */
+    public static SemanticVersion getReleaseVersion(InetAddress ep)
+    {
+        try
+        {
+            if (FBUtilities.getBroadcastAddress().equals(ep))
+            {
+                return new SemanticVersion(FBUtilities.getReleaseVersionString());
+            }
+            String req = "SELECT release_version FROM system.%s WHERE peer=?";
+            UntypedResultSet result = executeInternal(String.format(req, PEERS_CF), ep);
+            if (result != null && result.one().has("release_version"))
+            {
+                return new SemanticVersion(result.one().getString("release_version"));
+            }
+            // version is unknown
+            return null;
+        }
+        catch (IllegalArgumentException e)
+        {
+            // version string cannot be parsed
+            return null;
+        }
+    }
+
+    /**
      * One of three things will happen if you try to read the system keyspace:
      * 1. files are present and you can read them: great
      * 2. no files are there: great (new node is assumed)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
new file mode 100644
index 0000000..e505d91
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.AbstractFuture;
+
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.net.IAsyncCallbackWithFailure;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.AnticompactionRequest;
+import org.apache.cassandra.utils.SemanticVersion;
+
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+{
+    /*
+     * Version that anticompaction response is not supported up to.
+     * If Cassandra version is more than this, we need to wait for anticompaction response.
+     */
+    private static final SemanticVersion VERSION_CHECKER = new SemanticVersion("2.1.5");
+
+    private final UUID parentSession;
+    private final InetAddress neighbor;
+
+    public AnticompactionTask(UUID parentSession, InetAddress neighbor)
+    {
+        this.parentSession = parentSession;
+        this.neighbor = neighbor;
+    }
+
+    public void run()
+    {
+        AnticompactionRequest acr = new AnticompactionRequest(parentSession);
+        SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
+        if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
+        {
+            MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this),
TimeUnit.DAYS.toMillis(1), true);
+        }
+        else
+        {
+            MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+            // immediately return after sending request
+            set(neighbor);
+        }
+    }
+
+    /**
+     * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
+     */
+    public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+    {
+        final AnticompactionTask task;
+
+        public AnticompactionCallback(AnticompactionTask task)
+        {
+            this.task = task;
+        }
+
+        public void response(MessageIn msg)
+        {
+            task.set(msg.from);
+        }
+
+        public boolean isLatencyForSnitch()
+        {
+            return false;
+        }
+
+        public void onFailure(InetAddress from)
+        {
+            task.setException(new RuntimeException("Anticompaction failed or timed out in
" + from));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 5b25afa..60b2243 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -24,6 +24,8 @@ import java.util.UUID;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Predicate;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +55,7 @@ import org.apache.cassandra.utils.Pair;
 public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
 {
     private static final Logger logger = LoggerFactory.getLogger(RepairMessageVerbHandler.class);
-    public void doVerb(MessageIn<RepairMessage> message, int id)
+    public void doVerb(final MessageIn<RepairMessage> message, final int id)
     {
         // TODO add cancel/interrupt message
         RepairJobDesc desc = message.payload.desc;
@@ -112,7 +114,15 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                 case ANTICOMPACTION_REQUEST:
                     logger.debug("Got anticompaction request");
                     AnticompactionRequest anticompactionRequest = (AnticompactionRequest)
message.payload;
-                    ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
+                    ListenableFuture<?> compactionDone = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
+                    compactionDone.addListener(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE),
id, message.from);
+                        }
+                    }, MoreExecutors.sameThreadExecutor());
                     break;
 
                 default:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 5cc26ed..8d3563c 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -50,7 +50,6 @@ import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.*;
-import org.apache.cassandra.repair.messages.AnticompactionRequest;
 import org.apache.cassandra.repair.messages.PrepareMessage;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.SyncComplete;
@@ -330,21 +329,24 @@ public class ActiveRepairService
      * @throws InterruptedException
      * @throws ExecutionException
      */
-    public synchronized void finishParentSession(UUID parentSession, Set<InetAddress>
neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException
+    public synchronized ListenableFuture<?> finishParentSession(UUID parentSession,
Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException
     {
         if (doAntiCompaction)
         {
+            List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size()
+ 1);
             for (InetAddress neighbor : neighbors)
             {
-                AnticompactionRequest acr = new AnticompactionRequest(parentSession);
-                MessageOut<RepairMessage> req = acr.createMessage();
-                MessagingService.instance().sendOneWay(req, neighbor);
+                AnticompactionTask task = new AnticompactionTask(parentSession, neighbor);
+                tasks.add(task);
+                task.run(); // 'run' is just sending message
             }
-            doAntiCompaction(parentSession).get();
+            tasks.add(doAntiCompaction(parentSession));
+            return Futures.successfulAsList(tasks);
         }
         else
         {
             removeParentRepairSession(parentSession);
+            return Futures.immediateFuture(null);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7f9259c..b6c6ecf 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -142,6 +142,8 @@ import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.OutputHandler;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
+
+import com.google.common.util.concurrent.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,9 +165,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Uninterruptibles;
 
 /**
  * This abstraction contains the token/identifier of this node
@@ -2949,9 +2948,20 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
                 }
                 if (!fullRepair)
                 {
-                    ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors,
successful);
+                    ListenableFuture future = ActiveRepairService.instance.finishParentSession(parentSession,
allNeighbors, successful);
+                    future.addListener(new Runnable()
+                    {
+                        @Override
+                        public void run()
+                        {
+                            sendNotification("repair", String.format("Repair command #%d
finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
+                        }
+                    }, MoreExecutors.sameThreadExecutor());
+                }
+                else
+                {
+                    sendNotification("repair", String.format("Repair command #%d finished",
cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
                 }
-                sendNotification("repair", String.format("Repair command #%d finished", cmd),
new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
             }
         }, null);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/utils/SemanticVersion.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/SemanticVersion.java b/src/java/org/apache/cassandra/utils/SemanticVersion.java
index 694f09f..858029d 100644
--- a/src/java/org/apache/cassandra/utils/SemanticVersion.java
+++ b/src/java/org/apache/cassandra/utils/SemanticVersion.java
@@ -32,7 +32,7 @@ import com.google.common.base.Objects;
  */
 public class SemanticVersion implements Comparable<SemanticVersion>
 {
-    private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)\\.(\\d+)(\\-[.\\w]+)?(\\+[.\\w]+)?";
+    private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)\\.(\\d+)(\\-[.\\w]+)?([.+][.\\w]+)?";
     private static final Pattern pattern = Pattern.compile(VERSION_REGEXP);
 
     public final int major;


Mime
View raw message