cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [4/4] cassandra git commit: Wait for all repair sessions to finish
Date Thu, 13 Nov 2014 18:56:11 GMT
Wait for all repair sessions to finish

patch by yukim; reviewed by krummas for CASSANDRA-8208


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2808b1d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2808b1d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2808b1d

Branch: refs/heads/trunk
Commit: b2808b1dcea1158511421f947660f03d583e84b0
Parents: f4456a2
Author: Yuki Morishita <yukim@apache.org>
Authored: Mon Nov 3 14:17:37 2014 -0600
Committer: Yuki Morishita <yukim@apache.org>
Committed: Thu Nov 13 12:50:10 2014 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 .../repair/RepairMessageVerbHandler.java        |  2 +-
 .../apache/cassandra/repair/RepairResult.java   |  3 ++
 .../apache/cassandra/repair/RepairSession.java  |  6 +--
 .../cassandra/repair/RepairSessionResult.java   | 43 ++++++++++++++++++++
 .../repair/messages/AnticompactionRequest.java  | 26 ++++++++++--
 .../cassandra/service/ActiveRepairService.java  | 16 +++++---
 .../cassandra/service/StorageService.java       | 30 +++++++++-----
 8 files changed, 104 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d656faf..82fbbc5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -30,7 +30,7 @@
  * Use unsafe mutations for most unit tests (CASSANDRA-6969)
  * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
  * Fail on very large batch sizes (CASSANDRA-8011)
- * improve concurrency of repair (CASSANDRA-6455)
+ * improve concurrency of repair (CASSANDRA-6455, 8208)
 
 2.1.3
  * Support for frozen collections (CASSANDRA-7859)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 2e96ee3..1880e8e 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -124,7 +124,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                 logger.debug("Got anticompaction request {}", anticompactionRequest);
                 try
                 {
-                    List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
+                    List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession,
anticompactionRequest.successfulRanges);
                     FBUtilities.waitOnFutures(futures);
                 }
                 catch (Exception e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairResult.java b/src/java/org/apache/cassandra/repair/RepairResult.java
index 259d5f3..333b48a 100644
--- a/src/java/org/apache/cassandra/repair/RepairResult.java
+++ b/src/java/org/apache/cassandra/repair/RepairResult.java
@@ -19,6 +19,9 @@ package org.apache.cassandra.repair;
 
 import java.util.List;
 
+/**
+ * RepairJob's result
+ */
 public class RepairResult
 {
     public final RepairJobDesc desc;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index c273c4e..cc46dbe 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -75,7 +75,7 @@ import org.apache.cassandra.utils.Pair;
  * Similarly, if a job is sequential, it will handle one SyncTask at a time, but will handle
  * all of them in parallel otherwise.
  */
-public class RepairSession extends AbstractFuture<List<RepairResult>> implements
IEndpointStateChangeSubscriber,
+public class RepairSession extends AbstractFuture<RepairSessionResult> implements IEndpointStateChangeSubscriber,
                                                                                  IFailureDetectionEventListener
 {
     private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
@@ -223,7 +223,7 @@ public class RepairSession extends AbstractFuture<List<RepairResult>>
implements
         if (endpoints.isEmpty())
         {
             logger.info(String.format("[repair #%s] No neighbors to repair with on range
%s: session completed", getId(), range));
-            set(Lists.<RepairResult>newArrayList());
+            set(new RepairSessionResult(id, keyspace, range, Lists.<RepairResult>newArrayList()));
             return;
         }
 
@@ -255,7 +255,7 @@ public class RepairSession extends AbstractFuture<List<RepairResult>>
implements
             {
                 // this repair session is completed
                 logger.info(String.format("[repair #%s] session completed successfully",
getId()));
-                set(results);
+                set(new RepairSessionResult(id, keyspace, range, results));
                 taskExecutor.shutdown();
                 // mark this session as terminated
                 terminate();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairSessionResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSessionResult.java b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
new file mode 100644
index 0000000..4551608
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.util.Collection;
+import java.util.UUID;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+/**
+ * Repair session result
+ */
+public class RepairSessionResult
+{
+    public final UUID sessionId;
+    public final String keyspace;
+    public final Range<Token> range;
+    public final Collection<RepairResult> repairJobResults;
+
+    public RepairSessionResult(UUID sessionId, String keyspace, Range<Token> range,
Collection<RepairResult> repairJobResults)
+    {
+        this.sessionId = sessionId;
+        this.keyspace = keyspace;
+        this.range = range;
+        this.repairJobResults = repairJobResults;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
index 1a13ad1..239ab0e 100644
--- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
+++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java
@@ -19,8 +19,13 @@ package org.apache.cassandra.repair.messages;
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.UUID;
 
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.UUIDSerializer;
 
@@ -28,11 +33,16 @@ public class AnticompactionRequest extends RepairMessage
 {
     public static MessageSerializer serializer = new AnticompactionRequestSerializer();
     public final UUID parentRepairSession;
+    /**
+     * Successfully repaired ranges. Does not contain null.
+     */
+    public final Collection<Range<Token>> successfulRanges;
 
-    public AnticompactionRequest(UUID parentRepairSession)
+    public AnticompactionRequest(UUID parentRepairSession, Collection<Range<Token>>
ranges)
     {
         super(Type.ANTICOMPACTION_REQUEST, null);
         this.parentRepairSession = parentRepairSession;
+        this.successfulRanges = ranges;
     }
 
     public static class AnticompactionRequestSerializer implements MessageSerializer<AnticompactionRequest>
@@ -40,17 +50,27 @@ public class AnticompactionRequest extends RepairMessage
         public void serialize(AnticompactionRequest message, DataOutputPlus out, int version)
throws IOException
         {
             UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version);
+            out.writeInt(message.successfulRanges.size());
+            for (Range r : message.successfulRanges)
+                Range.serializer.serialize(r, out, version);
         }
 
         public AnticompactionRequest deserialize(DataInput in, int version) throws IOException
         {
             UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version);
-            return new AnticompactionRequest(parentRepairSession);
+            int rangeCount = in.readInt();
+            List<Range<Token>> ranges = new ArrayList<>(rangeCount);
+            for (int i = 0; i < rangeCount; i++)
+                ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds());
+            return new AnticompactionRequest(parentRepairSession, ranges);
         }
 
         public long serializedSize(AnticompactionRequest message, int version)
         {
-            return UUIDSerializer.serializer.serializedSize(message.parentRepairSession,
version);
+            long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession,
version);
+            for (Range r : message.successfulRanges)
+                size += Range.serializer.serializedSize(r, version);
+            return size;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 763ecdf..3c1cc48 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -257,7 +257,7 @@ public class ActiveRepairService
         for (ColumnFamilyStore cfs : columnFamilyStores)
             cfIds.add(cfs.metadata.cfId);
 
-        for(InetAddress neighbour : endpoints)
+        for (InetAddress neighbour : endpoints)
         {
             PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(),
options.isIncremental());
             MessageOut<RepairMessage> msg = message.createMessage();
@@ -287,17 +287,17 @@ public class ActiveRepairService
         parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores,
ranges, isIncremental, System.currentTimeMillis()));
     }
 
-    public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors)
+    public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors,
Collection<Range<Token>> successfulRanges)
     {
         try
         {
             for (InetAddress neighbor : neighbors)
             {
-                AnticompactionRequest acr = new AnticompactionRequest(parentSession);
+                AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges);
                 MessageOut<RepairMessage> req = acr.createMessage();
                 MessagingService.instance().sendOneWay(req, neighbor);
             }
-            List<Future<?>> futures = doAntiCompaction(parentSession);
+            List<Future<?>> futures = doAntiCompaction(parentSession, successfulRanges);
             FBUtilities.waitOnFutures(futures);
         }
         finally
@@ -316,12 +316,16 @@ public class ActiveRepairService
         return parentRepairSessions.remove(parentSessionId);
     }
 
-    public List<Future<?>> doAntiCompaction(UUID parentRepairSession)
+    public List<Future<?>> doAntiCompaction(UUID parentRepairSession, Collection<Range<Token>>
successfulRanges)
     {
         assert parentRepairSession != null;
         ParentRepairSession prs = getParentRepairSession(parentRepairSession);
+        assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction
on unknown ranges";
 
         List<Future<?>> futures = new ArrayList<>();
+        // if we don't have successful repair ranges, then just skip anticompaction
+        if (successfulRanges.isEmpty())
+            return futures;
         for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
         {
 
@@ -338,7 +342,7 @@ public class ActiveRepairService
                 success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
             }
 
-            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges,
sstables, prs.repairedAt));
+            futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges,
sstables, prs.repairedAt));
         }
 
         return futures;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 450bc5c..a0b7975 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -30,7 +30,6 @@ import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import javax.annotation.Nullable;
 import javax.management.JMX;
 import javax.management.MBeanServer;
 import javax.management.Notification;
@@ -78,6 +77,7 @@ import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.ResponseVerbHandler;
 import org.apache.cassandra.repair.RepairMessageVerbHandler;
+import org.apache.cassandra.repair.RepairSessionResult;
 import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.repair.RepairResult;
 import org.apache.cassandra.repair.RepairSession;
@@ -2679,7 +2679,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
                                                                                         
                                  new NamedThreadFactory("Repair#" + cmd),
                                                                                         
                                  "internal"));
 
-                List<ListenableFuture<?>> futures = new ArrayList<>(options.getRanges().size());
+                List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
                 String[] cfnames = new String[columnFamilyStores.size()];
                 for (int i = 0; i < columnFamilyStores.size(); i++)
                 {
@@ -2698,9 +2698,9 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
                     if (session == null)
                         continue;
                     // After repair session completes, notify client its result
-                    Futures.addCallback(session, new FutureCallback<List<RepairResult>>()
+                    Futures.addCallback(session, new FutureCallback<RepairSessionResult>()
                     {
-                        public void onSuccess(List<RepairResult> results)
+                        public void onSuccess(RepairSessionResult result)
                         {
                             String message = String.format("Repair session %s for range %s
finished", session.getId(), session.getRange().toString());
                             logger.info(message);
@@ -2719,14 +2719,23 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
 
                 // After all repair sessions completes(successful or not),
                 // run anticompaction if necessary and send finish notice back to client
-                ListenableFuture<?> allSessions = Futures.allAsList(futures);
-                Futures.addCallback(allSessions, new FutureCallback<Object>()
+                final ListenableFuture<List<RepairSessionResult>> allSessions
= Futures.successfulAsList(futures);
+                Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>()
                 {
-                    public void onSuccess(@Nullable Object result)
+                    public void onSuccess(List<RepairSessionResult> result)
                     {
+                        // filter out null(=failed) results and get successful ranges
+                        Collection<Range<Token>> successfulRanges = new ArrayList<>();
+                        for (RepairSessionResult sessionResult : result)
+                        {
+                            if (sessionResult != null)
+                            {
+                                successfulRanges.add(sessionResult.range);
+                            }
+                        }
                         try
                         {
-                            ActiveRepairService.instance.finishParentSession(parentSession,
allNeighbors);
+                            ActiveRepairService.instance.finishParentSession(parentSession,
allNeighbors, successfulRanges);
                         }
                         catch (Exception e)
                         {
@@ -2742,14 +2751,15 @@ public class StorageService extends NotificationBroadcasterSupport
implements IE
 
                     private void repairComplete()
                     {
-                        String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis()
- startTime, true, true);
+                        String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis()
- startTime,
+                                                                                  true, true);
                         String message = String.format("Repair command #%d finished in %s",
cmd, duration);
                         sendNotification("repair", message,
                                          new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
                         logger.info(message);
                         executor.shutdownNow();
                     }
-                }, MoreExecutors.sameThreadExecutor());
+                });
             }
         }, null);
     }


Mime
View raw message