Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DE60C18990 for ; Tue, 19 Apr 2016 13:56:50 +0000 (UTC) Received: (qmail 86163 invoked by uid 500); 19 Apr 2016 13:56:46 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 86081 invoked by uid 500); 19 Apr 2016 13:56:46 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 85516 invoked by uid 99); 19 Apr 2016 13:56:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Apr 2016 13:56:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 51FCCE027F; Tue, 19 Apr 2016 13:56:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marcuse@apache.org To: commits@cassandra.apache.org Date: Tue, 19 Apr 2016 13:56:52 -0000 Message-Id: <885941bd19514d42a6d73c62ab306968@git.apache.org> In-Reply-To: <771326f22448480a992993be90c4894f@git.apache.org> References: <771326f22448480a992993be90c4894f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/14] cassandra git commit: Add unit test for CASSANDRA-11548 Add unit test for CASSANDRA-11548 Patch by Paulo Motta; reviewed by Marcus Eriksson for CASSANDRA-11548 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/209ebd38 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/209ebd38 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/209ebd38 Branch: refs/heads/cassandra-2.2 Commit: 209ebd380b641c4f065e9687186f546f8a50b242 Parents: d200d13 Author: Paulo Motta Authored: Mon Apr 18 18:44:07 2016 -0300 Committer: Marcus Eriksson Committed: Tue Apr 19 15:42:36 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 4 +- .../org/apache/cassandra/db/DataTracker.java | 12 +++ .../SSTableCompactingNotification.java | 41 ++++++++ .../LongLeveledCompactionStrategyTest.java | 101 +++++++++++++++++++ 4 files changed, 155 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/209ebd38/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 90a4f23..76d3673 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,5 @@ -2.1.15 - * Replace sstables on DataTracker before marking them as non-compacting during anti-compaction (CASSANDRA-11548) - 2.1.14 + * Replace sstables on DataTracker before marking them as non-compacting during anti-compaction (CASSANDRA-11548) * Checking if an unlogged batch is local is inefficient (CASSANDRA-11529) * Fix paging for COMPACT tables without clustering columns (CASSANDRA-11467) * Fix out-of-space error treatment in memtable flushing (CASSANDRA-11448) http://git-wip-us.apache.org/repos/asf/cassandra/blob/209ebd38/src/java/org/apache/cassandra/db/DataTracker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DataTracker.java b/src/java/org/apache/cassandra/db/DataTracker.java index ef25236..c731a35 100644 --- a/src/java/org/apache/cassandra/db/DataTracker.java +++ b/src/java/org/apache/cassandra/db/DataTracker.java @@ -222,7 +222,10 @@ public class DataTracker View newView = currentView.markCompacting(sstables); if (view.compareAndSet(currentView, newView)) + { + notifyCompacting(sstables, true); return true; + } } } @@ -247,6 +250,8 @@ public class DataTracker // interrupted after the CFS is invalidated, those sstables need to be unreferenced as well, so we do that here. unreferenceSSTables(); } + + notifyCompacting(unmark, false); } public void markObsolete(Collection sstables, OperationType compactionType) @@ -511,6 +516,13 @@ public class DataTracker subscriber.handleNotification(notification, this); } + public void notifyCompacting(Iterable reader, boolean compacting) + { + INotification notification = new SSTableCompactingNotification(reader, compacting); + for (INotificationConsumer subscriber : subscribers) + subscriber.handleNotification(notification, this); + } + public void notifyAdded(SSTableReader added) { INotification notification = new SSTableAddedNotification(added); http://git-wip-us.apache.org/repos/asf/cassandra/blob/209ebd38/src/java/org/apache/cassandra/notifications/SSTableCompactingNotification.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/notifications/SSTableCompactingNotification.java b/src/java/org/apache/cassandra/notifications/SSTableCompactingNotification.java new file mode 100644 index 0000000..6eddf3f --- /dev/null +++ b/src/java/org/apache/cassandra/notifications/SSTableCompactingNotification.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.notifications; + +import org.apache.cassandra.io.sstable.SSTableReader; + +public class SSTableCompactingNotification implements INotification +{ + public final Iterable sstables; + public final boolean compacting; + + public SSTableCompactingNotification(Iterable sstables, boolean compacting) + { + this.sstables = sstables; + this.compacting = compacting; + } + + public String toString() + { + return "SSTableCompactingNotification{" + + "sstables=" + sstables + + ", compacting=" + compacting + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/209ebd38/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java index 0eb769f..fa6a31b 100644 --- a/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java +++ b/test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java @@ -19,6 +19,9 @@ package org.apache.cassandra.db.compaction; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.*; @@ -28,9 +31,19 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.db.*; +import org.apache.cassandra.dht.BytesToken; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.notifications.INotification; +import org.apache.cassandra.notifications.INotificationConsumer; +import org.apache.cassandra.notifications.SSTableCompactingNotification; +import org.apache.cassandra.notifications.SSTableListChangedNotification; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.Refs; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class LongLeveledCompactionStrategyTest extends SchemaLoader @@ -125,4 +138,92 @@ public class LongLeveledCompactionStrategyTest extends SchemaLoader } } } + + class CheckThatSSTableIsReleasedOnlyAfterCompactionFinishes implements INotificationConsumer + { + public final Set finishedCompaction = new HashSet<>(); + + boolean failed = false; + + public void handleNotification(INotification received, Object sender) + { + if (received instanceof SSTableCompactingNotification) + { + SSTableCompactingNotification notification = (SSTableCompactingNotification) received; + if (!notification.compacting) + { + for (SSTableReader reader : notification.sstables) + { + finishedCompaction.add(reader); + } + } + } + if (received instanceof SSTableListChangedNotification) + { + SSTableListChangedNotification notification = (SSTableListChangedNotification) received; + for (SSTableReader reader : notification.removed) + { + if (finishedCompaction.contains(reader)) + failed = true; + } + } + } + + boolean isFailed() + { + return failed; + } + } + + @Test + public void testAntiCompactionAfterLCS() throws Exception + { + testParallelLeveledCompaction(); + + String ksname = "Keyspace1"; + String cfname = "StandardLeveled"; + Keyspace keyspace = Keyspace.open(ksname); + ColumnFamilyStore store = keyspace.getColumnFamilyStore(cfname); + WrappingCompactionStrategy strategy = ((WrappingCompactionStrategy) store.getCompactionStrategy()); + + Collection initialSSTables = store.getUnrepairedSSTables(); + assertEquals(store.getSSTables().size(), initialSSTables.size()); + + CheckThatSSTableIsReleasedOnlyAfterCompactionFinishes checker = new CheckThatSSTableIsReleasedOnlyAfterCompactionFinishes(); + store.getDataTracker().subscribe(checker); + + //anti-compact a subset of sstables + Range range = new Range(new BytesToken("110".getBytes()), new BytesToken("111".getBytes()), store.partitioner); + List> ranges = Arrays.asList(range); + Refs refs = Refs.tryRef(initialSSTables); + if (refs == null) + throw new IllegalStateException(); + long repairedAt = 1000; + CompactionManager.instance.performAnticompaction(store, ranges, refs, repairedAt); + + //check that sstables were released only after compaction finished + assertFalse("Anti-compaction released sstable from compacting set before compaction was finished", + checker.isFailed()); + + //check there is only one global ref count + for (SSTableReader sstable : store.getSSTables()) + { + assertFalse(sstable.isMarkedCompacted()); + assertEquals(1, sstable.selfRef().globalCount()); + } + + //check that compacting status was clearedd in all sstables + assertEquals(0, store.getDataTracker().getCompacting().size()); + + //make sure readers were replaced correctly on unrepaired leveled manifest after anti-compaction + LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) strategy.getWrappedStrategies().get(1); + for (SSTableReader reader : initialSSTables) + { + Range sstableRange = new Range(reader.first.getToken(), reader.last.getToken()); + if (sstableRange.intersects(range)) + { + assertFalse(lcs.manifest.generations[reader.getSSTableLevel()].contains(reader)); + } + } + } }