Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2C00A19931 for ; Wed, 6 Apr 2016 14:53:10 +0000 (UTC) Received: (qmail 89468 invoked by uid 500); 6 Apr 2016 14:53:09 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 89434 invoked by uid 500); 6 Apr 2016 14:53:09 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 89414 invoked by uid 99); 6 Apr 2016 14:53:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Apr 2016 14:53:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BA86ADFFAE; Wed, 6 Apr 2016 14:53:09 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: samt@apache.org To: commits@cassandra.apache.org Date: Wed, 06 Apr 2016 14:53:09 -0000 Message-Id: <637a8792ad8a4931af770b1c638d08e9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/3] cassandra git commit: Notify registered indexes of expired rows during compaction Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 1681c18c2 -> 424593205 refs/heads/trunk ee4bada2f -> b680ddd61 Notify registered indexes of expired rows during compaction Patch by Sam Tunnicliffe; reviewed by Sylvain Lebresne for CASSANDRA-11329 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/42459320 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/42459320 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/42459320 Branch: refs/heads/cassandra-3.0 Commit: 42459320586636c6dcbec9f56544d8a5256a3412 Parents: 1681c18 Author: Sam Tunnicliffe Authored: Wed Mar 9 18:58:47 2016 +0000 Committer: Sam Tunnicliffe Committed: Wed Apr 6 15:18:23 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/index/SecondaryIndexManager.java | 19 +++++++++----- .../index/internal/CassandraIndex.java | 2 +- .../apache/cassandra/index/CustomIndexTest.java | 27 ++++++++++++++++++++ 4 files changed, 41 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/42459320/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8f4329a..26ab66d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.6 + * Notify indexers of expired rows during compaction (CASSANDRA-11329) * Properly respond with ProtocolError when a v1/v2 native protocol header is received (CASSANDRA-11464) * Validate that num_tokens and initial_token are consistent with one another (CASSANDRA-10120) http://git-wip-us.apache.org/repos/asf/cassandra/blob/42459320/src/java/org/apache/cassandra/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java index 16cb9c4..0a2e128 100644 --- a/src/java/org/apache/cassandra/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/index/SecondaryIndexManager.java @@ -902,6 +902,8 @@ public class SecondaryIndexManager implements IndexRegistry { public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) { + if (original != null && (merged == null || !merged.isLive(nowInSec))) + getBuilder(i, clustering).addPrimaryKeyLivenessInfo(original); } public void onDeletion(int i, Clustering clustering, Row.Deletion merged, Row.Deletion original) @@ -914,15 +916,18 @@ public class SecondaryIndexManager implements IndexRegistry public void onCell(int i, Clustering clustering, Cell merged, Cell original) { - if (original != null && merged == null) + if (original != null && (merged == null || !merged.isLive(nowInSec))) + getBuilder(i, clustering).addCell(original); + } + + private Row.Builder getBuilder(int index, Clustering clustering) + { + if (builders[index] == null) { - if (builders[i] == null) - { - builders[i] = BTreeRow.sortedBuilder(); - builders[i].newRow(clustering); - } - builders[i].addCell(original); + builders[index] = BTreeRow.sortedBuilder(); + builders[index].newRow(clustering); } + return builders[index]; } }; http://git-wip-us.apache.org/repos/asf/cassandra/blob/42459320/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 74d3f5d..4bbf682 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -380,7 +380,7 @@ public abstract class CassandraIndex implements Index public void removeRow(Row row) { if (isPrimaryKeyIndex()) - indexPrimaryKey(row.clustering(), row.primaryKeyLivenessInfo(), row.deletion()); + return; if (indexedColumn.isComplex()) removeCells(row.clustering(), row.getComplexColumnData(indexedColumn)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/42459320/test/unit/org/apache/cassandra/index/CustomIndexTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/index/CustomIndexTest.java b/test/unit/org/apache/cassandra/index/CustomIndexTest.java index 0b553f4..9de3606 100644 --- a/test/unit/org/apache/cassandra/index/CustomIndexTest.java +++ b/test/unit/org/apache/cassandra/index/CustomIndexTest.java @@ -2,6 +2,7 @@ package org.apache.cassandra.index; import java.util.*; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -537,6 +538,32 @@ public class CustomIndexTest extends CQLTester } @Test + public void notifyIndexersOfExpiredRowsDuringCompaction() throws Throwable + { + createTable("CREATE TABLE %s (k int, c int, PRIMARY KEY (k,c))"); + createIndex(String.format("CREATE CUSTOM INDEX row_ttl_test_index ON %%s() USING '%s'", StubIndex.class.getName())); + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); + StubIndex index = (StubIndex)cfs.indexManager.getIndexByName("row_ttl_test_index"); + + execute("INSERT INTO %s (k, c) VALUES (?, ?) USING TTL 1", 0, 0); + execute("INSERT INTO %s (k, c) VALUES (?, ?)", 0, 1); + execute("INSERT INTO %s (k, c) VALUES (?, ?)", 0, 2); + execute("INSERT INTO %s (k, c) VALUES (?, ?)", 3, 3); + assertEquals(4, index.rowsInserted.size()); + // flush so that we end up with an expiring row in the first sstable + flush(); + + // let the row with the ttl expire, then force a compaction + TimeUnit.SECONDS.sleep(2); + compact(); + + // the index should have been notified of the expired row + assertEquals(1, index.rowsDeleted.size()); + Integer deletedClustering = Int32Type.instance.compose(index.rowsDeleted.get(0).clustering().get(0)); + assertEquals(0, deletedClustering.intValue()); + } + + @Test public void validateOptions() throws Throwable { createTable("CREATE TABLE %s(k int, c int, v1 int, v2 int, PRIMARY KEY(k,c))");