cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From alek...@apache.org
Subject [1/3] git commit: Fix hint replay with many accumulated expired hints
Date Thu, 16 Oct 2014 16:19:43 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk bb6b0d64e -> 96f40e66b


Fix hint replay with many accumulated expired hints

patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for
CASSANDRA-6998


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/98bcf402
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/98bcf402
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/98bcf402

Branch: refs/heads/trunk
Commit: 98bcf40226db7c8dbafa6eacee9ce5ef3feaeb1b
Parents: 19c2f54
Author: Aleksey Yeschenko <aleksey@apache.org>
Authored: Thu Oct 16 19:04:33 2014 +0300
Committer: Aleksey Yeschenko <aleksey@apache.org>
Committed: Thu Oct 16 19:05:59 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/HintedHandOffManager.java      | 56 +++++++++++---------
 .../cassandra/service/StorageService.java       |  2 +-
 .../apache/cassandra/db/HintedHandOffTest.java  |  7 +--
 4 files changed, 33 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 967b90c..158a48b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.11:
+ * Fix hint replay with many accumulated expired hints (CASSANDRA-6998)
  * Fix duplicate results in DISTINCT queries on static columns with query
    paging (CASSANDRA-8108)
  * Properly validate ascii and utf8 string literals in CQL queries (CASSANDRA-8101)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/src/java/org/apache/cassandra/db/HintedHandOffManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
index f6bb033..a6b6d4c 100644
--- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java
+++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -216,6 +216,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                 {
                     logger.info("Deleting any stored hints for {}", endpoint);
                     rm.apply();
+                    hintStore.forceBlockingFlush();
                     compact();
                 }
                 catch (Exception e)
@@ -250,13 +251,20 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     }
 
     @VisibleForTesting
-    protected Future<?> compact()
+    protected void compact()
     {
-        hintStore.forceBlockingFlush();
-        ArrayList<Descriptor> descriptors = new ArrayList<Descriptor>();
+        ArrayList<Descriptor> descriptors = new ArrayList<>();
         for (SSTable sstable : hintStore.getDataTracker().getUncompactingSSTables())
             descriptors.add(sstable.descriptor);
-        return CompactionManager.instance.submitUserDefined(hintStore, descriptors, (int)
(System.currentTimeMillis() / 1000));
+
+        try
+        {
+            CompactionManager.instance.submitUserDefined(hintStore, descriptors, (int) (System.currentTimeMillis()
/ 1000)).get();
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
     }
 
     private static boolean pagingFinished(ColumnFamily hintColumnFamily, ByteBuffer startColumn)
@@ -333,9 +341,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
     /*
      * 1. Get the key of the endpoint we need to handoff
      * 2. For each column, deserialize the mutation and send it to the endpoint
-     * 3. Delete the subcolumn if the write was successful
+     * 3. Delete the column if the write was successful
      * 4. Force a flush
-     * 5. Do major compaction to clean up all deletes etc.
      */
     private void doDeliverHintsToEndpoint(InetAddress endpoint)
     {
@@ -357,7 +364,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
                            / (StorageService.instance.getTokenMetadata().getAllEndpoints().size()
- 1);
         RateLimiter rateLimiter = RateLimiter.create(throttleInKB == 0 ? Double.MAX_VALUE
: throttleInKB * 1024);
 
-        boolean finished = false;
         delivery:
         while (true)
         {
@@ -375,7 +381,6 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             if (pagingFinished(hintsPage, startColumn))
             {
                 logger.info("Finished hinted handoff of {} rows to endpoint {}", rowsReplayed,
endpoint);
-                finished = true;
                 break;
             }
 
@@ -469,17 +474,8 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             }
         }
 
-        if (finished || rowsReplayed.get() >= DatabaseDescriptor.getTombstoneWarnThreshold())
-        {
-            try
-            {
-                compact().get();
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
+        // Flush all the tombstones to disk
+        hintStore.forceBlockingFlush();
     }
 
     // read less columns (mutations) per page if they are very large
@@ -503,8 +499,12 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
      */
     private void scheduleAllDeliveries()
     {
-        if (logger.isDebugEnabled())
-          logger.debug("Started scheduleAllDeliveries");
+        logger.debug("Started scheduleAllDeliveries");
+
+        // Force a major compaction to get rid of the tombstones and expired hints. Do it
once, before we schedule any
+        // individual replay, to avoid N - 1 redundant individual compactions (when N is
the number of nodes with hints
+        // to deliver to).
+        compact();
 
         IPartitioner p = StorageService.getPartitioner();
         RowPosition minPos = p.getMinimumToken().minKeyBound();
@@ -517,11 +517,10 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             InetAddress target = StorageService.instance.getTokenMetadata().getEndpointForHostId(hostId);
             // token may have since been removed (in which case we have just read back a
tombstone)
             if (target != null)
-                scheduleHintDelivery(target);
+                scheduleHintDelivery(target, false);
         }
 
-        if (logger.isDebugEnabled())
-          logger.debug("Finished scheduleAllDeliveries");
+        logger.debug("Finished scheduleAllDeliveries");
     }
 
     /*
@@ -529,7 +528,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
      * When we learn that some endpoint is back up we deliver the data
      * to him via an event driven mechanism.
     */
-    public void scheduleHintDelivery(final InetAddress to)
+    public void scheduleHintDelivery(final InetAddress to, final boolean precompact)
     {
         // We should not deliver hints to the same host in 2 different threads
         if (!queuedDeliveries.add(to))
@@ -543,6 +542,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
             {
                 try
                 {
+                    // If it's an individual node hint replay (triggered by Gossip or via
JMX), and not the global scheduled replay
+                    // (every 10 minutes), force a major compaction to get rid of the tombstones
and expired hints.
+                    if (precompact)
+                        compact();
+
                     deliverHintsToEndpoint(to);
                 }
                 finally
@@ -555,7 +559,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean
 
     public void scheduleHintDelivery(String to) throws UnknownHostException
     {
-        scheduleHintDelivery(InetAddress.getByName(to));
+        scheduleHintDelivery(InetAddress.getByName(to), true);
     }
 
     public void pauseHintsDelivery(boolean b)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 0cd4be5..cca6f79 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1977,7 +1977,7 @@ public class StorageService extends NotificationBroadcasterSupport implements
IE
 
         if (tokenMetadata.isMember(endpoint))
         {
-            HintedHandOffManager.instance.scheduleHintDelivery(endpoint);
+            HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true);
             for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers)
                 subscriber.onUp(endpoint);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/98bcf402/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
index 9ffd702..302a1e7 100644
--- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
+++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java
@@ -23,7 +23,6 @@ package org.apache.cassandra.db;
 import java.net.InetAddress;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.Test;
 
@@ -31,12 +30,10 @@ import com.google.common.collect.Iterators;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.UntypedResultSet;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
@@ -75,9 +72,7 @@ public class HintedHandOffTest extends SchemaLoader
         assertEquals(1, hintStore.getSSTables().size());
 
         // submit compaction
-        FBUtilities.waitOnFuture(HintedHandOffManager.instance.compact());
-        while (CompactionManager.instance.getPendingTasks() > 0 || CompactionManager.instance.getActiveCompactions()
> 0)
-            TimeUnit.SECONDS.sleep(1);
+        HintedHandOffManager.instance.compact();
 
         // single row should not be removed because of gc_grace_seconds
         // is 10 hours and there are no any tombstones in sstable


Mime
View raw message