Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F0CD180BA for ; Mon, 6 Jul 2015 11:19:41 +0000 (UTC) Received: (qmail 25575 invoked by uid 500); 6 Jul 2015 11:19:41 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 25531 invoked by uid 500); 6 Jul 2015 11:19:41 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 25509 invoked by uid 99); 6 Jul 2015 11:19:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Jul 2015 11:19:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A4216E0215; Mon, 6 Jul 2015 11:19:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: benedict@apache.org To: commits@cassandra.apache.org Date: Mon, 06 Jul 2015 11:19:41 -0000 Message-Id: <3f8bc0a3131d441180854a2844827df7@git.apache.org> In-Reply-To: <2bd8a0493fee43e59669a8ad2f2c3775@git.apache.org> References: <2bd8a0493fee43e59669a8ad2f2c3775@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/6] cassandra git commit: Follow-up to CASSANDRA-9656, fixing: * DataTracker.getCurrentVersion * keyCache wiring * offline totalDiskSpaceUsed maintenance, affecting unit tests Follow-up to CASSANDRA-9656, fixing: * DataTracker.getCurrentVersion * keyCache wiring * offline totalDiskSpaceUsed maintenance, affecting unit tests Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ec320e8a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ec320e8a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ec320e8a Branch: refs/heads/cassandra-2.2 Commit: ec320e8a1dd01b547d2e2cb3f0d1b994e814bda0 Parents: ca40d11 Author: Benedict Elliott Smith Authored: Mon Jul 6 10:17:20 2015 +0100 Committer: Benedict Elliott Smith Committed: Mon Jul 6 12:12:54 2015 +0100 ---------------------------------------------------------------------- .../org/apache/cassandra/db/DataTracker.java | 8 ++- .../cassandra/db/compaction/CompactionTask.java | 2 + .../cassandra/io/sstable/SSTableReader.java | 5 ++ .../cassandra/io/sstable/SSTableRewriter.java | 16 +++-- .../io/sstable/SSTableRewriterTest.java | 72 ++++++++++---------- 5 files changed, 59 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec320e8a/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index e84371c..ef25236 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -572,7 +572,13 @@ public class DataTracker public SSTableReader getCurrentVersion(SSTableReader sstable) { - return view.get().sstablesMap.get(sstable); + if (!sstable.isReplaced()) + return sstable; + SSTableReader current = view.get().sstablesMap.get(sstable); + if (current == null) + current = Iterables.find(view.get().shadowed, Predicates.equalTo(sstable), null); + assert current != null : sstable + " not in live set"; + return current; } public static class SSTableIntervalTree extends IntervalTree> http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec320e8a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 751f8f3..c6e3d2f 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -240,6 +240,8 @@ public class CompactionTask extends AbstractCompactionTask Collection oldSStables = this.sstables; if (!offline) cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); + else + Refs.release(Refs.selfRefs(newSStables)); // log a bunch of statistics about the result and save to system table compaction_history long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec320e8a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index d2b4416..7551d46 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -956,6 +956,11 @@ public class SSTableReader extends SSTable implements SelfRefCounted invalidateKeys = new ArrayList<>(); if (!reset) { + newReader.setupKeyCache(); invalidateKeys.addAll(cachedKeys.keySet()); for (Map.Entry cacheKey : cachedKeys.entrySet()) newReader.cacheKey(cacheKey.getKey(), cacheKey.getValue()); @@ -344,8 +345,11 @@ public class SSTableRewriter private InvalidateKeys(SSTableReader reader, Collection invalidate) { this.cache = reader.getKeyCache(); - for (DecoratedKey key : invalidate) - cacheKeys.add(reader.getCacheKey(key)); + if (cache != null) + { + for (DecoratedKey key : invalidate) + cacheKeys.add(reader.getCacheKey(key)); + } } public void run() @@ -500,8 +504,8 @@ public class SSTableRewriter { for (SSTableReader reader : discard) { - if (dataTracker.getCurrentVersion(reader) == reader) - reader.markObsolete(dataTracker); + if (!reader.isReplaced()) + reader.markObsolete(null); reader.selfRef().release(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ec320e8a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 73055a2..5ebfef7 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -23,9 +23,6 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ThreadLocalRandom; -import javax.annotation.Nullable; - -import com.google.common.base.Function; import com.google.common.collect.Sets; import org.junit.Test; @@ -41,14 +38,10 @@ import org.apache.cassandra.db.compaction.SSTableSplitter; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; -import org.apache.cassandra.io.util.DataIntegrityMetadata; -import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.metrics.StorageMetrics; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.concurrent.Refs; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -63,7 +56,8 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); + for (int j = 0; j < 100; j ++) { ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j)); @@ -99,7 +93,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -130,7 +124,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -176,9 +170,7 @@ public class SSTableRewriterTest extends SchemaLoader validateCFS(cfs); int filecounts = assertFileCounts(sstables.iterator().next().descriptor.directory.list(), 0, 0); assertEquals(1, filecounts); - cfs.truncateBlocking(); - Thread.sleep(1000); // make sure the deletion tasks have run etc - validateCFS(cfs); + truncate(cfs); } @Test @@ -186,7 +178,8 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); + ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); for (int i = 0; i < 1000; i++) cf.addColumn(Util.column(String.valueOf(i), "a", 1)); @@ -226,7 +219,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -274,7 +267,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -400,7 +393,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -437,7 +430,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -485,7 +478,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); cfs.disableAutoCompaction(); SSTableReader s = writeFile(cfs, 1000); @@ -513,9 +506,7 @@ public class SSTableRewriterTest extends SchemaLoader cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, sstables, OperationType.COMPACTION); Thread.sleep(1000); assertFileCounts(s.descriptor.directory.list(), 0, 0); - cfs.truncateBlocking(); - Thread.sleep(1000); // make sure the deletion tasks have run etc - validateCFS(cfs); + truncate(cfs); } catch (Throwable t) { @@ -529,7 +520,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); cfs.disableAutoCompaction(); SSTableReader s = writeFile(cfs, 400); @@ -572,7 +563,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); cfs.disableAutoCompaction(); SSTableReader s = writeFile(cfs, 1000); cfs.getDataTracker().markCompacting(Arrays.asList(s), true, false); @@ -580,11 +571,13 @@ public class SSTableRewriterTest extends SchemaLoader splitter.split(); Thread.sleep(1000); assertFileCounts(s.descriptor.directory.list(), 0, 0); + s.selfRef().release(); for (File f : s.descriptor.directory.listFiles()) { // we need to clear out the data dir, otherwise tests running after this breaks f.delete(); } + truncate(cfs); } @Test @@ -614,7 +607,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); if (!offline) cfs.addSSTable(s); @@ -655,11 +648,11 @@ public class SSTableRewriterTest extends SchemaLoader assertEquals(1, cfs.getSSTables().size()); validateCFS(cfs); } - cfs.truncateBlocking(); - Thread.sleep(1000); + truncate(cfs); filecount = assertFileCounts(s.descriptor.directory.list(), 0, 0); if (offline) { + s.selfRef().release(); // the file is not added to the CFS, therefor not truncated away above assertEquals(1, filecount); for (File f : s.descriptor.directory.listFiles()) @@ -670,7 +663,7 @@ public class SSTableRewriterTest extends SchemaLoader } assertEquals(0, filecount); - + truncate(cfs); } @Test @@ -678,7 +671,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); for (int i = 0; i < 100; i++) { DecoratedKey key = Util.dk(Integer.toString(i)); @@ -727,6 +720,7 @@ public class SSTableRewriterTest extends SchemaLoader validateKeys(keyspace); Thread.sleep(1000); validateCFS(cfs); + truncate(cfs); } @Test @@ -734,7 +728,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -765,9 +759,7 @@ public class SSTableRewriterTest extends SchemaLoader } writer.abort(); cfs.getDataTracker().unmarkCompacting(sstables); - cfs.truncateBlocking(); - SSTableDeletingTask.waitForDeletions(); - validateCFS(cfs); + truncate(cfs); } /** @@ -780,7 +772,7 @@ public class SSTableRewriterTest extends SchemaLoader { Keyspace keyspace = Keyspace.open(KEYSPACE); ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF); - cfs.truncateBlocking(); + truncate(cfs); SSTableReader s = writeFile(cfs, 1000); cfs.addSSTable(s); @@ -813,9 +805,7 @@ public class SSTableRewriterTest extends SchemaLoader writer.abort(); writer2.abort(); cfs.getDataTracker().unmarkCompacting(sstables); - cfs.truncateBlocking(); - SSTableDeletingTask.waitForDeletions(); - validateCFS(cfs); + truncate(cfs); } @@ -870,8 +860,16 @@ public class SSTableRewriterTest extends SchemaLoader } } assertTrue(cfs.getDataTracker().getCompacting().isEmpty()); + assertTrue("" + cfs.getTotalDiskSpaceUsed(), cfs.getTotalDiskSpaceUsed() >= 0); } + private void truncate(ColumnFamilyStore cfs) + { + cfs.truncateBlocking(); + SSTableDeletingTask.waitForDeletions(); + validateCFS(cfs); + assertTrue(cfs.getTotalDiskSpaceUsed() == 0); + } private int assertFileCounts(String [] files, int expectedtmplinkCount, int expectedtmpCount) {