cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bdeggles...@apache.org
Subject cassandra git commit: Rework CSM.getScanners synchronization
Date Tue, 12 Sep 2017 20:34:03 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 37771f31b -> 7d4d1a325


Rework CSM.getScanners synchronization

Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13786


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7d4d1a32
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7d4d1a32
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7d4d1a32

Branch: refs/heads/trunk
Commit: 7d4d1a32581ff40ed1049833631832054bcf2316
Parents: 37771f3
Author: Blake Eggleston <bdeggleston@gmail.com>
Authored: Thu Aug 24 13:00:44 2017 -0700
Committer: Blake Eggleston <bdeggleston@gmail.com>
Committed: Tue Sep 12 13:31:47 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 21 +++-----
 .../compaction/CompactionStrategyManager.java   | 56 ++++++++++++++++----
 .../db/compaction/PendingRepairManager.java     |  9 +---
 4 files changed, 53 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d4d1a32/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a5dc68d..ebe0dc0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Rework CompactionStrategyManager.getScanners synchronization (CASSANDRA-13786)
  * Add additional unit tests for batch behavior, TTLs, Timestamps (CASSANDRA-13846)
  * Add keyspace and table name in schema validation exception (CASSANDRA-13845)
  * Emit metrics whenever we hit tombstone failures and warn thresholds (CASSANDRA-13771)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d4d1a32/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 722a5d0..5619da7 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -642,17 +642,12 @@ public class CompactionManager implements CompactionManagerMBean
             logger.info("{} Starting anticompaction for {}.{} on {}/{} sstables", PreviewKind.NONE.logPrefix(parentRepairSession),
cfs.keyspace.getName(), cfs.getTableName(), validatedForRepair.size(), cfs.getLiveSSTables().size());
             logger.trace("{} Starting anticompaction for ranges {}", PreviewKind.NONE.logPrefix(parentRepairSession),
ranges);
             Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
-            Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();
-            // we should only notify that repair status changed if it actually did:
-            Set<SSTableReader> mutatedRepairStatusToNotify = new HashSet<>();
-            Map<SSTableReader, Boolean> wasRepairedBefore = new HashMap<>();
-            for (SSTableReader sstable : sstables)
-                wasRepairedBefore.put(sstable, sstable.isRepaired());
 
             Set<SSTableReader> nonAnticompacting = new HashSet<>();
 
             Iterator<SSTableReader> sstableIterator = sstables.iterator();
             List<Range<Token>> normalizedRanges = Range.normalize(ranges);
+            Set<SSTableReader> fullyContainedSSTables = new HashSet<>();
 
             while (sstableIterator.hasNext())
             {
@@ -667,11 +662,7 @@ public class CompactionManager implements CompactionManagerMBean
                     if (r.contains(sstableRange))
                     {
                         logger.info("{} SSTable {} fully contained in range {}, mutating
repairedAt instead of anticompacting", PreviewKind.NONE.logPrefix(parentRepairSession), sstable,
r);
-                        sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor,
repairedAt, pendingRepair);
-                        sstable.reloadSSTableMetadata();
-                        mutatedRepairStatuses.add(sstable);
-                        if (!wasRepairedBefore.get(sstable))
-                            mutatedRepairStatusToNotify.add(sstable);
+                        fullyContainedSSTables.add(sstable);
                         sstableIterator.remove();
                         shouldAnticompact = true;
                         break;
@@ -690,10 +681,10 @@ public class CompactionManager implements CompactionManagerMBean
                     sstableIterator.remove();
                 }
             }
-            cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(mutatedRepairStatuses));
-            cfs.getTracker().notifySSTableRepairedStatusChanged(mutatedRepairStatusToNotify);
-            txn.cancel(Sets.union(nonAnticompacting, mutatedRepairStatuses));
-            validatedForRepair.release(Sets.union(nonAnticompacting, mutatedRepairStatuses));
+            cfs.metric.bytesMutatedAnticompaction.inc(SSTableReader.getTotalBytes(fullyContainedSSTables));
+            cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, repairedAt,
pendingRepair);
+            txn.cancel(Sets.union(nonAnticompacting, fullyContainedSSTables));
+            validatedForRepair.release(Sets.union(nonAnticompacting, fullyContainedSSTables));
             assert txn.originals().equals(sstables);
             if (!sstables.isEmpty())
                 doAntiCompaction(cfs, ranges, txn, repairedAt, pendingRepair);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d4d1a32/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index 3b1bc41..9192b70 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -18,6 +18,7 @@
 package org.apache.cassandra.db.compaction;
 
 
+import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -754,22 +755,22 @@ public class CompactionStrategyManager implements INotificationConsumer
             unrepairedSSTables.add(new HashSet<>());
         }
 
-        for (SSTableReader sstable : sstables)
-        {
-            int idx = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
-            if (sstable.isPendingRepair())
-                pendingSSTables.get(idx).add(sstable);
-            else if (sstable.isRepaired())
-                repairedSSTables.get(idx).add(sstable);
-            else
-                unrepairedSSTables.get(idx).add(sstable);
-        }
-
         List<ISSTableScanner> scanners = new ArrayList<>(sstables.size());
 
         readLock.lock();
         try
         {
+            for (SSTableReader sstable : sstables)
+            {
+                int idx = getCompactionStrategyIndex(cfs, getDirectories(), sstable);
+                if (sstable.isPendingRepair())
+                    pendingSSTables.get(idx).add(sstable);
+                else if (sstable.isRepaired())
+                    repairedSSTables.get(idx).add(sstable);
+                else
+                    unrepairedSSTables.get(idx).add(sstable);
+            }
+
             for (int i = 0; i < pendingSSTables.size(); i++)
             {
                 if (!pendingSSTables.get(i).isEmpty())
@@ -1163,4 +1164,37 @@ public class CompactionStrategyManager implements INotificationConsumer
     {
         return pendingRepairs;
     }
+
+    /**
+     * Mutates sstable repairedAt times and notifies listeners of the change with the writeLock
held. Prevents races
+     * with other processes between when the metadata is changed and when sstables are moved
between strategies.
+     */
+    public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt,
UUID pendingRepair) throws IOException
+    {
+        Set<SSTableReader> changed = new HashSet<>();
+
+        writeLock.lock();
+        try
+        {
+            for (SSTableReader sstable: sstables)
+            {
+                sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor,
repairedAt, pendingRepair);
+                sstable.reloadSSTableMetadata();
+                changed.add(sstable);
+            }
+        }
+        finally
+        {
+            try
+            {
+                // if there was an exception mutating repairedAt, we should still notify
for the
+                // sstables that we were able to modify successfully before releasing the
lock
+                cfs.getTracker().notifySSTableRepairedStatusChanged(changed);
+            }
+            finally
+            {
+                writeLock.unlock();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7d4d1a32/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index 8ee6025..98acbdb 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -436,18 +436,11 @@ class PendingRepairManager
             try
             {
                 logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(),
sessionID);
-                for (SSTableReader sstable : transaction.originals())
-                {
-                    sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor,
repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
-                    sstable.reloadSSTableMetadata();
-                }
+                cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(),
repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
                 completed = true;
             }
             finally
             {
-                // even if we weren't able to rewrite all the sstable metedata, we should
still move the ones that were
-                cfs.getTracker().notifySSTableRepairedStatusChanged(transaction.originals());
-
                 // we always abort because mutating metadata isn't guarded by LifecycleTransaction,
so this won't roll
                 // anything back. Also, we don't want to obsolete the originals. We're only
using it to prevent other
                 // compactions from marking these sstables compacting, and unmarking them
when we're done


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message