cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/6] cassandra git commit: Improve TRUNCATE performance
Date Mon, 02 Oct 2017 07:40:47 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 15cee48bb -> b32a9e645
  refs/heads/cassandra-3.11 9d56132ae -> 983c72a84
  refs/heads/trunk cecb2de05 -> 694b3c401


Improve TRUNCATE performance

Patch by marcuse; reviewed by Stefania Alborghetti for CASSANDRA-13909


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b32a9e64
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b32a9e64
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b32a9e64

Branch: refs/heads/cassandra-3.0
Commit: b32a9e6452c78e6ad08e371314bf1ab7492d0773
Parents: 15cee48
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Mon Sep 25 14:44:37 2017 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Mon Oct 2 09:29:22 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/lifecycle/Helpers.java  | 15 +++++
 .../apache/cassandra/db/lifecycle/LogFile.java  | 25 ++++++++
 .../cassandra/db/lifecycle/LogRecord.java       | 65 +++++++++++++++++++-
 .../cassandra/db/lifecycle/LogTransaction.java  | 16 +++++
 .../apache/cassandra/db/lifecycle/Tracker.java  |  2 +-
 .../cassandra/db/lifecycle/HelpersTest.java     | 58 ++++++++++++++++-
 7 files changed, 179 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4a45469..d6423b4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Improve TRUNCATE performance (CASSANDRA-13909)
  * Implement short read protection on partition boundaries (CASSANDRA-13595)
  * Fix ISE thrown by UPI.Serializer.hasNext() for some SELECT queries (CASSANDRA-13911)
  * Filter header only commit logs before recovery (CASSANDRA-13918)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
index f9555f4..b9adc4b 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Helpers.java
@@ -141,6 +141,21 @@ class Helpers
         return accumulate;
     }
 
+    static Throwable prepareForBulkObsoletion(Iterable<SSTableReader> readers, LogTransaction
txnLogs, List<LogTransaction.Obsoletion> obsoletions, Throwable accumulate)
+    {
+        try
+        {
+            for (Map.Entry<SSTableReader, LogTransaction.SSTableTidier> entry : txnLogs.bulkObsoletion(readers).entrySet())
+                obsoletions.add(new LogTransaction.Obsoletion(entry.getKey(), entry.getValue()));
+        }
+        catch (Throwable t)
+        {
+            accumulate = Throwables.merge(accumulate, t);
+        }
+
+        return accumulate;
+    }
+
     static Throwable abortObsoletion(List<LogTransaction.Obsoletion> obsoletions, Throwable
accumulate)
     {
         if (obsoletions == null || obsoletions.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
index da5bb39..be26163 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LogRecord.Type;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.utils.Throwables;
 
@@ -284,6 +285,25 @@ final class LogFile implements AutoCloseable
             throw new IllegalStateException();
     }
 
+    public void addAll(Type type, Iterable<SSTableReader> toBulkAdd)
+    {
+        for (LogRecord record : makeRecords(type, toBulkAdd))
+            if (!addRecord(record))
+                throw new IllegalStateException();
+    }
+
+    private Collection<LogRecord> makeRecords(Type type, Iterable<SSTableReader>
tables)
+    {
+        assert type == Type.ADD || type == Type.REMOVE;
+
+        for (SSTableReader sstable : tables)
+        {
+            File folder = sstable.descriptor.directory;
+            replicas.maybeCreateReplica(folder, getFileName(folder), records);
+        }
+        return LogRecord.make(type, tables);
+    }
+
     private LogRecord makeRecord(Type type, SSTable table)
     {
         assert type == Type.ADD || type == Type.REMOVE;
@@ -414,4 +434,9 @@ final class LogFile implements AutoCloseable
     {
         return records;
     }
+
+    public boolean isEmpty()
+    {
+        return records.isEmpty();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
index ac6d6d0..a322ea1 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogRecord.java
@@ -21,6 +21,7 @@
 package org.apache.cassandra.db.lifecycle;
 
 import java.io.File;
+import java.io.FilenameFilter;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.*;
@@ -30,7 +31,9 @@ import java.util.stream.Collectors;
 import java.util.zip.CRC32;
 
 import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -151,10 +154,35 @@ final class LogRecord
         // there is no separator after the generation number, and this would cause files
of sstables with
         // a higher generation number that starts with the same number, to be incorrectly
classified as files
         // of this record sstable
-        String absoluteTablePath = FileUtils.getCanonicalPath(table.descriptor.baseFilename()
+ Component.separator);
+        String absoluteTablePath = absolutePath(table.descriptor.baseFilename());
         return make(type, getExistingFiles(absoluteTablePath), table.getAllFilePaths().size(),
absoluteTablePath);
     }
 
+    public static Collection<LogRecord> make(Type type, Iterable<SSTableReader>
tables)
+    {
+        // contains a mapping from sstable absolute path (everything up until the 'Data'/'Index'/etc
part of the filename) to the sstable
+        Map<String, SSTable> absolutePaths = new HashMap<>();
+        for (SSTableReader table : tables)
+            absolutePaths.put(absolutePath(table.descriptor.baseFilename()), table);
+
+        // maps sstable base file name to the actual files on disk
+        Map<String, List<File>> existingFiles = getExistingFiles(absolutePaths.keySet());
+        List<LogRecord> records = new ArrayList<>(existingFiles.size());
+        for (Map.Entry<String, List<File>> entry : existingFiles.entrySet())
+        {
+            List<File> filesOnDisk = entry.getValue();
+            String baseFileName = entry.getKey();
+            SSTable sstable = absolutePaths.get(baseFileName);
+            records.add(make(type, filesOnDisk, sstable.getAllFilePaths().size(), baseFileName));
+        }
+        return records;
+    }
+
+    private static String absolutePath(String baseFilename)
+    {
+        return FileUtils.getCanonicalPath(baseFilename + Component.separator);
+    }
+
     public LogRecord withExistingFiles()
     {
         return make(type, getExistingFiles(), 0, absolutePath.get());
@@ -275,6 +303,41 @@ final class LogRecord
         return files == null ? Collections.emptyList() : Arrays.asList(files);
     }
 
+    /**
+     * absoluteFilePaths contains full file parts up to the component name
+     *
+     * this method finds all files on disk beginning with any of the paths in absoluteFilePaths
+     * @return a map from absoluteFilePath to actual file on disk.
+     */
+    public static Map<String, List<File>> getExistingFiles(Set<String>
absoluteFilePaths)
+    {
+        Set<File> uniqueDirectories = absoluteFilePaths.stream().map(path -> Paths.get(path).getParent().toFile()).collect(Collectors.toSet());
+        Map<String, List<File>> fileMap = new HashMap<>();
+        FilenameFilter ff = (dir, name) -> {
+            Descriptor descriptor = null;
+            try
+            {
+                descriptor = Descriptor.fromFilename(dir, name).left;
+            }
+            catch (Throwable t)
+            {// ignored - if we can't parse the filename, just skip the file
+            }
+
+            String absolutePath = descriptor != null ? absolutePath(descriptor.baseFilename())
: null;
+            if (absolutePath != null && absoluteFilePaths.contains(absolutePath))
+                fileMap.computeIfAbsent(absolutePath, k -> new ArrayList<>()).add(new
File(dir, name));
+
+            return false;
+        };
+
+        // populate the file map:
+        for (File f : uniqueDirectories)
+            f.listFiles(ff);
+
+        return fileMap;
+    }
+
+
     public boolean isFinal()
     {
         return type.isFinal();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
index 350477c..6599142 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/LogTransaction.java
@@ -163,6 +163,22 @@ class LogTransaction extends Transactional.AbstractTransactional implements
Tran
         return new SSTableTidier(reader, false, this);
     }
 
+    Map<SSTableReader, SSTableTidier> bulkObsoletion(Iterable<SSTableReader>
sstables)
+    {
+        if (!txnFile.isEmpty())
+            throw new IllegalStateException("Bad state when doing bulk obsoletions");
+
+        txnFile.addAll(Type.REMOVE, sstables);
+        Map<SSTableReader, SSTableTidier> tidiers = new HashMap<>();
+        for (SSTableReader sstable : sstables)
+        {
+            if (tracker != null)
+                tracker.notifyDeleting(sstable);
+            tidiers.put(sstable, new SSTableTidier(sstable, false, this));
+        }
+        return tidiers;
+    }
+
     OperationType type()
     {
         return txnFile.type();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
index 9feaa3e..d281278 100644
--- a/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/Tracker.java
@@ -245,7 +245,7 @@ public class Tracker
             // It is important that any method accepting/returning a Throwable never throws
an exception, and does its best
             // to complete the instructions given to it
             List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
-            accumulate = prepareForObsoletion(removed, txnLogs, obsoletions, accumulate);
+            accumulate = prepareForBulkObsoletion(removed, txnLogs, obsoletions, accumulate);
             try
             {
                 txnLogs.finish();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b32a9e64/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
index 3549523..1b8e265 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -18,14 +18,21 @@
 */
 package org.apache.cassandra.db.lifecycle;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -158,12 +165,23 @@ public class HelpersTest
     @Test
     public void testMarkObsolete()
     {
+        testMarkObsoleteHelper(false);
+    }
+    @Test
+    public void testBulkMarkObsolete()
+    {
+        testMarkObsoleteHelper(true);
+    }
+
+    public void testMarkObsoleteHelper(boolean bulk)
+    {
         ColumnFamilyStore cfs = MockSchema.newCFS();
         LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN);
         Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1,
cfs), MockSchema.sstable(2, cfs));
+        Iterable<SSTableReader> readersToKeep = Lists.newArrayList(MockSchema.sstable(3,
cfs), MockSchema.sstable(4, cfs));
 
         List<LogTransaction.Obsoletion> obsoletions = new ArrayList<>();
-        Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
+        Assert.assertNull(bulk ? Helpers.prepareForBulkObsoletion(readers, txnLogs, obsoletions,
null) : Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
         assertNotNull(obsoletions);
         assertEquals(2, obsoletions.size());
 
@@ -172,9 +190,47 @@ public class HelpersTest
         for (SSTableReader reader : readers)
             Assert.assertTrue(reader.isMarkedCompacted());
 
+        for (SSTableReader reader : readersToKeep)
+            Assert.assertFalse(reader.isMarkedCompacted());
+
         accumulate = Helpers.markObsolete(obsoletions, null);
         assertNotNull(accumulate);
 
         txnLogs.finish();
     }
+
+    @Test
+    public void compareBulkAndNormalObsolete() throws IOException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS();
+        LogTransaction txnLogs = new LogTransaction(OperationType.UNKNOWN);
+        LogTransaction txnLogs2 = new LogTransaction(OperationType.UNKNOWN);
+
+        Collection<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1,
cfs), MockSchema.sstable(2, cfs));
+        // add a few readers that should not be removed:
+        Lists.newArrayList(MockSchema.sstable(3, cfs), MockSchema.sstable(4, cfs));
+
+        List<LogTransaction.Obsoletion> normalObsoletions = new ArrayList<>();
+        List<LogTransaction.Obsoletion> bulkObsoletions = new ArrayList<>();
+
+        Assert.assertNull(Helpers.prepareForBulkObsoletion(readers, txnLogs, normalObsoletions,
null));
+        Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs2, bulkObsoletions,
null));
+
+        assertEquals(Sets.newHashSet(readers), normalObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet()));
+        assertEquals(Sets.newHashSet(readers), bulkObsoletions.stream().map(obs -> obs.reader).collect(Collectors.toSet()));
+
+        Set<String> normalLogRecords = new HashSet<>();
+        Set<String> bulkLogRecords = new HashSet<>();
+
+        for (File f : txnLogs.logFiles())
+            Files.lines(f.toPath()).forEach(bulkLogRecords::add);
+        for (File f : txnLogs2.logFiles())
+            Files.lines(f.toPath()).forEach(normalLogRecords::add);
+
+        Assert.assertEquals(readers.size(), normalLogRecords.size());
+        Assert.assertEquals(bulkLogRecords, normalLogRecords);
+
+        txnLogs.finish();
+        txnLogs2.finish();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message