cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject [3/6] cassandra git commit: Follow-up to CASSANDRA-9656, fixing: * DataTracker.getCurrentVersion * keyCache wiring * offline totalDiskSpaceUsed maintenance, affecting unit tests
Date Mon, 06 Jul 2015 11:19:42 GMT
Follow-up to CASSANDRA-9656, fixing:
  * DataTracker.getCurrentVersion
  * keyCache wiring
  * offline totalDiskSpaceUsed maintenance, affecting unit tests


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec320e8a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec320e8a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec320e8a

Branch: refs/heads/trunk
Commit: ec320e8a1dd01b547d2e2cb3f0d1b994e814bda0
Parents: ca40d11
Author: Benedict Elliott Smith <benedict@apache.org>
Authored: Mon Jul 6 10:17:20 2015 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Mon Jul 6 12:12:54 2015 +0100

----------------------------------------------------------------------
 .../org/apache/cassandra/db/DataTracker.java    |  8 ++-
 .../cassandra/db/compaction/CompactionTask.java |  2 +
 .../cassandra/io/sstable/SSTableReader.java     |  5 ++
 .../cassandra/io/sstable/SSTableRewriter.java   | 16 +++--
 .../io/sstable/SSTableRewriterTest.java         | 72 ++++++++++----------
 5 files changed, 59 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec320e8a/src/java/org/apache/cassandra/db/DataTracker.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java
index e84371c..ef25236 100644
--- a/src/java/org/apache/cassandra/db/DataTracker.java
+++ b/src/java/org/apache/cassandra/db/DataTracker.java
@@ -572,7 +572,13 @@ public class DataTracker
 
     public SSTableReader getCurrentVersion(SSTableReader sstable)
     {
-        return view.get().sstablesMap.get(sstable);
+        if (!sstable.isReplaced())
+            return sstable;
+        SSTableReader current = view.get().sstablesMap.get(sstable);
+        if (current == null)
+            current = Iterables.find(view.get().shadowed, Predicates.equalTo(sstable), null);
+        assert current != null : sstable + " not in live set";
+        return current;
     }
 
     public static class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader,
Interval<RowPosition, SSTableReader>>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec320e8a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 751f8f3..c6e3d2f 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -240,6 +240,8 @@ public class CompactionTask extends AbstractCompactionTask
             Collection<SSTableReader> oldSStables = this.sstables;
             if (!offline)
                 cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables,
compactionType);
+            else
+                Refs.release(Refs.selfRefs(newSStables));
 
             // log a bunch of statistics about the result and save to system table compaction_history
             long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec320e8a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index d2b4416..7551d46 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -956,6 +956,11 @@ public class SSTableReader extends SSTable implements SelfRefCounted<SSTableRead
         }
     }
 
+    public boolean isReplaced()
+    {
+        return tidy.isReplaced;
+    }
+
     /**
      * Clone this reader with the provided start and open reason, and set the clone as replacement.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec320e8a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 92f74d7..82492a8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -216,7 +216,7 @@ public class SSTableRewriter
         {
             try
             {
-                sstable.markObsolete(dataTracker);
+                sstable.markObsolete(null);
                 sstable.selfRef().release();
             }
             catch (Throwable t)
@@ -245,7 +245,7 @@ public class SSTableRewriter
                 {
                     // if we've already been opened, add ourselves to the discard pile
                     discard.add(finished.reader);
-                    finished.reader.markObsolete(dataTracker);
+                    finished.reader.markObsolete(null);
                 }
             }
             catch (Throwable t)
@@ -290,6 +290,7 @@ public class SSTableRewriter
         final List<DecoratedKey> invalidateKeys = new ArrayList<>();
         if (!reset)
         {
+            newReader.setupKeyCache();
             invalidateKeys.addAll(cachedKeys.keySet());
             for (Map.Entry<DecoratedKey, RowIndexEntry> cacheKey : cachedKeys.entrySet())
                 newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue());
@@ -344,8 +345,11 @@ public class SSTableRewriter
         private InvalidateKeys(SSTableReader reader, Collection<DecoratedKey> invalidate)
         {
             this.cache = reader.getKeyCache();
-            for (DecoratedKey key : invalidate)
-                cacheKeys.add(reader.getCacheKey(key));
+            if (cache != null)
+            {
+                for (DecoratedKey key : invalidate)
+                    cacheKeys.add(reader.getCacheKey(key));
+            }
         }
 
         public void run()
@@ -500,8 +504,8 @@ public class SSTableRewriter
         {
             for (SSTableReader reader : discard)
             {
-                if (dataTracker.getCurrentVersion(reader) == reader)
-                    reader.markObsolete(dataTracker);
+                if (!reader.isReplaced())
+                    reader.markObsolete(null);
                 reader.selfRef().release();
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec320e8a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 73055a2..5ebfef7 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -23,9 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ThreadLocalRandom;
 
-import javax.annotation.Nullable;
-
-import com.google.common.base.Function;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
@@ -41,14 +38,10 @@ import org.apache.cassandra.db.compaction.SSTableSplitter;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.util.DataIntegrityMetadata;
-import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.metrics.StorageMetrics;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.concurrent.Ref;
-import org.apache.cassandra.utils.concurrent.Refs;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -63,7 +56,8 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
+
         for (int j = 0; j < 100; j ++)
         {
             ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
@@ -99,7 +93,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -130,7 +124,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -176,9 +170,7 @@ public class SSTableRewriterTest extends SchemaLoader
         validateCFS(cfs);
         int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(),
0, 0);
         assertEquals(1, filecounts);
-        cfs.truncateBlocking();
-        Thread.sleep(1000); // make sure the deletion tasks have run etc
-        validateCFS(cfs);
+        truncate(cfs);
     }
 
     @Test
@@ -186,7 +178,8 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
+
         ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
         for (int i = 0; i < 1000; i++)
             cf.addColumn(Util.column(String.valueOf(i), "a", 1));
@@ -226,7 +219,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -274,7 +267,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -400,7 +393,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -437,7 +430,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -485,7 +478,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         cfs.disableAutoCompaction();
 
         SSTableReader s = writeFile(cfs, 1000);
@@ -513,9 +506,7 @@ public class SSTableRewriterTest extends SchemaLoader
             cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION);
             Thread.sleep(1000);
             assertFileCounts(s.descriptor.directory.list(), 0, 0);
-            cfs.truncateBlocking();
-            Thread.sleep(1000); // make sure the deletion tasks have run etc
-            validateCFS(cfs);
+            truncate(cfs);
         }
         catch (Throwable t)
         {
@@ -529,7 +520,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         cfs.disableAutoCompaction();
 
         SSTableReader s = writeFile(cfs, 400);
@@ -572,7 +563,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         cfs.disableAutoCompaction();
         SSTableReader s = writeFile(cfs, 1000);
         cfs.getDataTracker().markCompacting(Arrays.asList(s), true, false);
@@ -580,11 +571,13 @@ public class SSTableRewriterTest extends SchemaLoader
         splitter.split();
         Thread.sleep(1000);
         assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        s.selfRef().release();
         for (File f : s.descriptor.directory.listFiles())
         {
             // we need to clear out the data dir, otherwise tests running after this breaks
             f.delete();
         }
+        truncate(cfs);
     }
 
     @Test
@@ -614,7 +607,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         SSTableReader s = writeFile(cfs, 1000);
         if (!offline)
             cfs.addSSTable(s);
@@ -655,11 +648,11 @@ public class SSTableRewriterTest extends SchemaLoader
             assertEquals(1, cfs.getSSTables().size());
             validateCFS(cfs);
         }
-        cfs.truncateBlocking();
-        Thread.sleep(1000);
+        truncate(cfs);
         filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0);
         if (offline)
         {
+            s.selfRef().release();
             // the file is not added to the CFS, therefor not truncated away above
             assertEquals(1, filecount);
             for (File f : s.descriptor.directory.listFiles())
@@ -670,7 +663,7 @@ public class SSTableRewriterTest extends SchemaLoader
         }
 
         assertEquals(0, filecount);
-
+        truncate(cfs);
     }
 
     @Test
@@ -678,7 +671,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
         for (int i = 0; i < 100; i++)
         {
             DecoratedKey key = Util.dk(Integer.toString(i));
@@ -727,6 +720,7 @@ public class SSTableRewriterTest extends SchemaLoader
         validateKeys(keyspace);
         Thread.sleep(1000);
         validateCFS(cfs);
+        truncate(cfs);
     }
 
     @Test
@@ -734,7 +728,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -765,9 +759,7 @@ public class SSTableRewriterTest extends SchemaLoader
         }
         writer.abort();
         cfs.getDataTracker().unmarkCompacting(sstables);
-        cfs.truncateBlocking();
-        SSTableDeletingTask.waitForDeletions();
-        validateCFS(cfs);
+        truncate(cfs);
     }
 
     /**
@@ -780,7 +772,7 @@ public class SSTableRewriterTest extends SchemaLoader
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE);
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
-        cfs.truncateBlocking();
+        truncate(cfs);
 
         SSTableReader s = writeFile(cfs, 1000);
         cfs.addSSTable(s);
@@ -813,9 +805,7 @@ public class SSTableRewriterTest extends SchemaLoader
         writer.abort();
         writer2.abort();
         cfs.getDataTracker().unmarkCompacting(sstables);
-        cfs.truncateBlocking();
-        SSTableDeletingTask.waitForDeletions();
-        validateCFS(cfs);
+        truncate(cfs);
     }
 
 
@@ -870,8 +860,16 @@ public class SSTableRewriterTest extends SchemaLoader
             }
         }
         assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
+        assertTrue("" + cfs.getTotalDiskSpaceUsed(), cfs.getTotalDiskSpaceUsed() >= 0);
     }
 
+    private void truncate(ColumnFamilyStore cfs)
+    {
+        cfs.truncateBlocking();
+        SSTableDeletingTask.waitForDeletions();
+        validateCFS(cfs);
+        assertTrue(cfs.getTotalDiskSpaceUsed() == 0);
+    }
 
     private int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount)
     {


Mime
View raw message