Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D4EE9D9DC for ; Thu, 30 Aug 2012 14:08:03 +0000 (UTC) Received: (qmail 52472 invoked by uid 500); 30 Aug 2012 14:08:03 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 52446 invoked by uid 500); 30 Aug 2012 14:08:03 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 52430 invoked by uid 99); 30 Aug 2012 14:08:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 30 Aug 2012 14:08:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 304802262E; Thu, 30 Aug 2012 14:08:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: redesign KEYS indexes to avoid read-before-write patch by Sam Tunnicliffe, jbellis, and Philip Jenvey for CASSANDRA-2897 Message-Id: <20120830140803.304802262E@tyr.zones.apache.org> Date: Thu, 30 Aug 2012 14:08:03 +0000 (UTC) Updated Branches: refs/heads/trunk 69cedbfca -> 8a1b93d79 redesign KEYS indexes to avoid read-before-write patch by Sam Tunnicliffe, jbellis, and Philip Jenvey for CASSANDRA-2897 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a1b93d7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a1b93d7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a1b93d7 Branch: refs/heads/trunk Commit: 8a1b93d79f39b80979f5403de1fac4f7e8d7cb02 Parents: 69cedbf Author: Jonathan Ellis Authored: Thu Aug 30 09:07:05 2012 -0500 Committer: Jonathan Ellis Committed: Thu Aug 30 09:07:41 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/AbstractColumnContainer.java | 5 +- .../db/AbstractThreadUnsafeSortedColumns.java | 9 +- .../cassandra/db/ArrayBackedSortedColumns.java | 6 + .../apache/cassandra/db/AtomicSortedColumns.java | 24 +- src/java/org/apache/cassandra/db/ColumnFamily.java | 1 + .../org/apache/cassandra/db/ColumnFamilyStore.java | 27 +- src/java/org/apache/cassandra/db/DeletionInfo.java | 38 -- .../org/apache/cassandra/db/ISortedColumns.java | 3 +- src/java/org/apache/cassandra/db/Memtable.java | 9 +- src/java/org/apache/cassandra/db/Table.java | 150 +-------- .../cassandra/db/ThreadSafeSortedColumns.java | 5 +- .../cassandra/db/TreeMapBackedSortedColumns.java | 61 +++- .../db/compaction/LazilyCompactedRow.java | 19 +- .../db/compaction/ParallelCompactionIterable.java | 7 +- .../cassandra/db/compaction/PrecompactedRow.java | 21 +- .../AbstractSimplePerColumnSecondaryIndex.java | 22 +- .../db/index/PerColumnSecondaryIndex.java | 9 +- .../cassandra/db/index/PerRowSecondaryIndex.java | 21 +- .../cassandra/db/index/SecondaryIndexManager.java | 264 ++++++++++----- .../cassandra/db/index/SecondaryIndexSearcher.java | 10 + .../db/index/composites/CompositesSearcher.java | 12 + .../cassandra/db/index/keys/KeysSearcher.java | 20 ++ .../io/sstable/SSTableIdentityIterator.java | 4 +- test/unit/org/apache/cassandra/SchemaLoader.java | 21 ++- .../apache/cassandra/config/CFMetaDataTest.java | 14 + .../apache/cassandra/db/ColumnFamilyStoreTest.java | 138 ++++++++ .../cassandra/db/SecondaryIndexColumnSizeTest.java | 22 +- 28 files changed, 578 insertions(+), 365 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9d09d57..ce7ec50 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.2-dev * new metrics (CASSANDRA-4009) + * redesign KEYS indexes to avoid read-before-write (CASSANDRA-2897) * debug tracing (CASSANDRA-1123) * parallelize row cache loading (CASSANDRA-4282) * Make compaction, flush JBOD-aware (CASSANDRA-4292) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/AbstractColumnContainer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java index 9ad0fda..ab93c54 100644 --- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java +++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java @@ -26,6 +26,7 @@ import com.google.common.base.Function; import com.google.common.base.Functions; import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.util.IIterableColumns; import org.apache.cassandra.utils.Allocator; @@ -83,9 +84,9 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter columns.maybeResetDeletionTimes(gcBefore); } - public long addAllWithSizeDelta(AbstractColumnContainer cc, Allocator allocator, Function transformation) + public long addAllWithSizeDelta(AbstractColumnContainer cc, Allocator allocator, Function transformation, SecondaryIndexManager.Updater indexer) { - return columns.addAllWithSizeDelta(cc.columns, allocator, transformation); + return columns.addAllWithSizeDelta(cc.columns, allocator, transformation, indexer); } public void addAll(AbstractColumnContainer cc, Allocator allocator, Function transformation) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java index 0e71cb3..31f6877 100644 --- a/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AbstractThreadUnsafeSortedColumns.java @@ -21,6 +21,7 @@ import java.util.Iterator; import com.google.common.base.Function; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.utils.Allocator; @@ -90,14 +91,6 @@ public abstract class AbstractThreadUnsafeSortedColumns implements ISortedColumn } } - public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function transformation) - { - // sizeDelta is only needed by memtable updates which should not be using thread-unsafe containers - throw new UnsupportedOperationException(); - } - - public abstract void addAll(ISortedColumns columns, Allocator allocator, Function transformation); - public boolean isEmpty() { return size() == 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java index 8af667f..8d813a3 100644 --- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java @@ -25,6 +25,7 @@ import com.google.common.collect.AbstractIterator; import com.google.common.collect.Lists; import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.utils.Allocator; @@ -206,6 +207,11 @@ public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns return -mid - (result < 0 ? 1 : 2); } + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function transformation, SecondaryIndexManager.Updater indexer) + { + throw new UnsupportedOperationException(); + } + public void addAll(ISortedColumns cm, Allocator allocator, Function transformation) { delete(cm.getDeletionInfo()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/AtomicSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java index f1629ab..83aabea 100644 --- a/src/java/org/apache/cassandra/db/AtomicSortedColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicSortedColumns.java @@ -25,6 +25,7 @@ import com.google.common.base.Function; import edu.stanford.ppl.concurrent.SnapTreeMap; import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.utils.Allocator; @@ -150,17 +151,17 @@ public class AtomicSortedColumns implements ISortedColumns { current = ref.get(); modified = current.cloneMe(); - modified.addColumn(column, allocator); + modified.addColumn(column, allocator, SecondaryIndexManager.nullUpdater); } while (!ref.compareAndSet(current, modified)); } public void addAll(ISortedColumns cm, Allocator allocator, Function transformation) { - addAllWithSizeDelta(cm, allocator, transformation); + addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater); } - public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function transformation) + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function transformation, SecondaryIndexManager.Updater indexer) { /* * This operation needs to atomicity and isolation. To that end, we @@ -186,7 +187,7 @@ public class AtomicSortedColumns implements ISortedColumns for (IColumn column : cm.getSortedColumns()) { - sizeDelta += modified.addColumn(transformation.apply(column), allocator); + sizeDelta += modified.addColumn(transformation.apply(column), allocator, indexer); // bail early if we know we've been beaten if (ref.get() != current) continue main_loop; @@ -335,14 +336,17 @@ public class AtomicSortedColumns implements ISortedColumns return new Holder(new SnapTreeMap(map.comparator()), deletionInfo); } - long addColumn(IColumn column, Allocator allocator) + long addColumn(IColumn column, Allocator allocator, SecondaryIndexManager.Updater indexer) { ByteBuffer name = column.name(); while (true) { IColumn oldColumn = map.putIfAbsent(name, column); if (oldColumn == null) + { + indexer.insert(column); return column.dataSize(); + } if (oldColumn instanceof SuperColumn) { @@ -353,11 +357,17 @@ public class AtomicSortedColumns implements ISortedColumns } else { - // calculate reconciled col from old (existing) col and new col IColumn reconciledColumn = column.reconcile(oldColumn, allocator); if (map.replace(name, oldColumn, reconciledColumn)) + { + // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting + // we need to make sure we update indexes no matter the order we merge + if (reconciledColumn == column) + indexer.update(oldColumn, reconciledColumn); + else + indexer.update(column, reconciledColumn); return reconciledColumn.dataSize() - oldColumn.dataSize(); - + } // We failed to replace column due to a concurrent update or a concurrent removal. Keep trying. // (Currently, concurrent removal should not happen (only updates), but let us support that anyway.) } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamily.java b/src/java/org/apache/cassandra/db/ColumnFamily.java index c2dd118..d70ee49 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.UUID; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.utils.*; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 464bac6..65e3fd4 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; + import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -712,12 +713,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean * param @ key - key for update/insert * param @ columnFamily - columnFamily changes */ - public void apply(DecoratedKey key, ColumnFamily columnFamily) + public void apply(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer) { long start = System.nanoTime(); Memtable mt = getMemtableThreadSafe(); - mt.put(key, columnFamily); + mt.put(key, columnFamily, indexer); updateRowCache(key, columnFamily); metric.writeLatency.addNano(System.nanoTime() - start); @@ -742,44 +743,54 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return cf.getColumnCount() == 0 && !cf.isMarkedForDelete() ? null : cf; } + public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore) + { + return removeDeleted(cf, gcBefore, SecondaryIndexManager.nullUpdater); + } + /* This is complicated because we need to preserve deleted columns, supercolumns, and columnfamilies until they have been deleted for at least GC_GRACE_IN_SECONDS. But, we do not need to preserve their contents; just the object itself as a "tombstone" that can be used to repair other replicas that do not know about the deletion. */ - public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore) + public static ColumnFamily removeDeleted(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) { if (cf == null) { return null; } - removeDeletedColumnsOnly(cf, gcBefore); + removeDeletedColumnsOnly(cf, gcBefore, indexer); return removeDeletedCF(cf, gcBefore); } - public static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore) + private static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) { if (cf.isSuper()) removeDeletedSuper(cf, gcBefore); else - removeDeletedStandard(cf, gcBefore); + removeDeletedStandard(cf, gcBefore, indexer); + } + + public static void removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore) + { + removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater); } - private static void removeDeletedStandard(ColumnFamily cf, int gcBefore) + private static void removeDeletedStandard(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) { Iterator iter = cf.iterator(); while (iter.hasNext()) { IColumn c = iter.next(); - ByteBuffer cname = c.name(); // remove columns if // (a) the column itself is gcable or // (b) the column is shadowed by a CF tombstone if (c.getLocalDeletionTime() < gcBefore || cf.deletionInfo().isDeleted(c)) { iter.remove(); + indexer.remove(c); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/DeletionInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionInfo.java b/src/java/org/apache/cassandra/db/DeletionInfo.java index 480d50a..b63686f 100644 --- a/src/java/org/apache/cassandra/db/DeletionInfo.java +++ b/src/java/org/apache/cassandra/db/DeletionInfo.java @@ -27,7 +27,6 @@ import java.util.*; import com.google.common.base.Objects; import com.google.common.collect.Iterables; -import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.ISSTableSerializer; @@ -116,43 +115,6 @@ public class DeletionInfo } /** - * Return the slice range covered by this deletion info or null is nothing is deleted. - */ - public ColumnSlice[] coveredSlices() - { - if (isLive()) - return null; - - if (!topLevel.equals(DeletionTime.LIVE)) - return ColumnSlice.ALL_COLUMNS_ARRAY; - - List slices = new ArrayList(); - ColumnSlice current = null; - for (RangeTombstone tombstone : ranges) - { - if (current == null) - { - current = new ColumnSlice(tombstone.min, tombstone.max); - } - else if (ranges.comparator().compare(current.finish, tombstone.min) < 0) - { - // If next if strictly after current, we've finish current slice - slices.add(current); - current = new ColumnSlice(tombstone.min, tombstone.max); - } - else if (ranges.comparator().compare(current.finish, tombstone.max) < 0) - { - // if tombstone end if after current end, extend current - current = new ColumnSlice(current.start, tombstone.max); - } - // otherwise, tombstone is fully included in current already, skip it - } - if (current != null) - slices.add(current); - return slices.isEmpty() ? null : slices.toArray(new ColumnSlice[slices.size()]); - } - - /** * Return a new DeletionInfo correspond to purging every tombstones that * are older than {@code gcbefore}. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ISortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ISortedColumns.java b/src/java/org/apache/cassandra/db/ISortedColumns.java index f80a249..5ccba19 100644 --- a/src/java/org/apache/cassandra/db/ISortedColumns.java +++ b/src/java/org/apache/cassandra/db/ISortedColumns.java @@ -26,6 +26,7 @@ import java.util.SortedSet; import com.google.common.base.Function; import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.util.IIterableColumns; import org.apache.cassandra.utils.Allocator; @@ -74,7 +75,7 @@ public interface ISortedColumns extends IIterableColumns * * @return the difference in size seen after merging the given columns */ - public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function transformation); + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function transformation, SecondaryIndexManager.Updater indexer); /** * Adds the columns without necessarily computing the size delta http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index aa6789d..4424811 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Function; import com.google.common.base.Throwables; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.util.DiskAwareRunnable; import org.cliffc.high_scale_lib.NonBlockingHashSet; import org.github.jamm.MemoryMeter; @@ -148,10 +149,10 @@ public class Memtable * (CFS handles locking to avoid submitting an op * to a flushing memtable. Any other way is unsafe.) */ - void put(DecoratedKey key, ColumnFamily columnFamily) + void put(DecoratedKey key, ColumnFamily columnFamily, SecondaryIndexManager.Updater indexer) { assert !isFrozen; // not 100% foolproof but hell, it's an assert - resolve(key, columnFamily); + resolve(key, columnFamily, indexer); } public void updateLiveRatio() throws RuntimeException @@ -222,7 +223,7 @@ public class Memtable meterExecutor.submit(runnable); } - private void resolve(DecoratedKey key, ColumnFamily cf) + private void resolve(DecoratedKey key, ColumnFamily cf, SecondaryIndexManager.Updater indexer) { ColumnFamily previous = columnFamilies.get(key); @@ -236,7 +237,7 @@ public class Memtable previous = empty; } - long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction); + long sizeDelta = previous.addAllWithSizeDelta(cf, allocator, localCopyFunction, indexer); currentSize.addAndGet(sizeDelta); currentOperations.addAndGet((cf.getColumnCount() == 0) ? cf.isMarkedForDelete() ? 1 : 0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java index 5b4520d..66fab5b 100644 --- a/src/java/org/apache/cassandra/db/Table.java +++ b/src/java/org/apache/cassandra/db/Table.java @@ -35,12 +35,12 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; /** * It represents a Keyspace. @@ -376,68 +376,7 @@ public class Table continue; } - ColumnSlice[] deletionSlices = null; - SortedSet mutatedIndexedColumns = null; - if (updateIndexes) - { - // If cf has some range deletion, we need to fetch those ranges to know if something indexed was updated - // Note: we could "optimize" that for Keys index, because we know that the columnDef name is directly - // the indexed column name. - deletionSlices = cf.deletionInfo().coveredSlices(); - - for (IColumn updated : cf) - { - if (cfs.indexManager.indexes(updated)) - { - if (mutatedIndexedColumns == null) - mutatedIndexedColumns = new TreeSet(cf.getComparator()); - mutatedIndexedColumns.add(updated.name()); - if (logger.isDebugEnabled()) - { - logger.debug(String.format("Mutated indexed column %s value %s", - cf.getComparator().getString(updated.name()), - ByteBufferUtil.bytesToHex(updated.value()))); - } - } - } - } - - // Sharding the lock is insufficient to avoid contention when there is a "hot" row, e.g., for - // hint writes when a node is down (keyed by target IP). So it is worth special-casing the - // no-index case to avoid the synchronization. - if (mutatedIndexedColumns == null && deletionSlices == null) - { - cfs.apply(key, cf); - continue; - } - // else mutatedIndexedColumns != null - synchronized (indexLockFor(mutation.key())) - { - if (mutatedIndexedColumns == null) - mutatedIndexedColumns = new TreeSet(cf.getComparator()); - - // with the raw data CF, we can just apply every update in any order and let - // read-time resolution throw out obsolete versions, thus avoiding read-before-write. - // but for indexed data we need to make sure that we're not creating index entries - // for obsolete writes. - ColumnFamily oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns, deletionSlices); - - // We might still have no mutated columns in case it is a deletion but the row had - // no indexed columns - if (mutatedIndexedColumns.isEmpty()) - { - cfs.apply(key, cf); - continue; - } - - logger.debug("Pre-mutation index row is {}", oldIndexedColumns); - ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns); - - cfs.apply(key, cf); - - // ignore full index memtables -- we flush those when the "master" one is full - cfs.indexManager.applyIndexUpdates(mutation.key(), cf, mutatedIndexedColumns, oldIndexedColumns); - } + cfs.apply(key, cf, updateIndexes ? cfs.indexManager.updaterFor(key, true) : SecondaryIndexManager.nullUpdater); } } finally @@ -446,86 +385,14 @@ public class Table } } - private static void ignoreObsoleteMutations(ColumnFamily cf, SortedSet mutatedIndexedColumns, ColumnFamily oldIndexedColumns) - { - // DO NOT modify the cf object here, it can race w/ the CL write (see https://issues.apache.org/jira/browse/CASSANDRA-2604) - - if (oldIndexedColumns == null) - return; - - for (Iterator iter = mutatedIndexedColumns.iterator(); iter.hasNext(); ) - { - ByteBuffer name = iter.next(); - IColumn newColumn = cf.getColumn(name); // null == row delete or it wouldn't be marked Mutated - if (newColumn != null && cf.isMarkedForDelete()) - { - // row is marked for delete, but column was also updated. if column is timestamped less than - // the row tombstone, treat it as if it didn't exist. Otherwise we don't care about row - // tombstone for the purpose of the index update and we can proceed as usual. - if (cf.deletionInfo().isDeleted(newColumn)) - { - // don't remove from the cf object; that can race w/ CommitLog write. Leaving it is harmless. - newColumn = null; - } - } - IColumn oldColumn = oldIndexedColumns.getColumn(name); - - // deletions are irrelevant to the index unless we're changing state from live -> deleted, i.e., - // just updating w/ a newer tombstone doesn't matter - boolean bothDeleted = (newColumn == null || newColumn.isMarkedForDelete()) - && (oldColumn == null || oldColumn.isMarkedForDelete()); - // obsolete means either the row or the column timestamp we're applying is older than existing data - boolean obsoleteRowTombstone = newColumn == null && oldColumn != null && !cf.deletionInfo().isDeleted(oldColumn); - boolean obsoleteColumn = newColumn != null && (oldIndexedColumns.deletionInfo().isDeleted(newColumn) - || (oldColumn != null && oldColumn.reconcile(newColumn) == oldColumn)); - - if (bothDeleted || obsoleteRowTombstone || obsoleteColumn) - { - if (logger.isDebugEnabled()) - logger.debug("skipping index update for obsolete mutation of " + cf.getComparator().getString(name)); - iter.remove(); - oldIndexedColumns.remove(name); - } - } - } - - private static ColumnFamily readCurrentIndexedColumns(DecoratedKey key, ColumnFamilyStore cfs, SortedSet mutatedIndexedColumns, ColumnSlice[] deletionSlices) - { - // Note: we could only query names not covered by the slices - QueryPath path = new QueryPath(cfs.getColumnFamilyName()); - ColumnFamily cf = ColumnFamily.create(cfs.metadata); - - if (mutatedIndexedColumns != null) - cf.resolve(cfs.getColumnFamily(QueryFilter.getNamesFilter(key, path, mutatedIndexedColumns))); - - if (deletionSlices != null) - { - SliceQueryPager pager = new SliceQueryPager(cfs, key, deletionSlices); - while (pager.hasNext()) - { - ColumnFamily cf2 = pager.next(); - cf.delete(cf2); - for (IColumn column : cf2) - { - if (cfs.indexManager.indexes(column)) - { - cf.addColumn(column); - mutatedIndexedColumns.add(column.name()); - } - } - } - } - return cf; - } - public AbstractReplicationStrategy getReplicationStrategy() { return replicationStrategy; } /** - * @param key row to index - * @param cfs ColumnFamily to index row in + * @param key row to index + * @param cfs ColumnFamily to index row in * @param idxNames columns to index, in comparator order */ public static void indexRow(DecoratedKey key, ColumnFamilyStore cfs, Set idxNames) @@ -533,13 +400,14 @@ public class Table if (logger.isDebugEnabled()) logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key)); + Collection indexes = cfs.indexManager.getIndexesByNames(idxNames); + switchLock.readLock().lock(); try { // Our index lock is per-row, but we don't want to hold writes for too long, so for large rows // we release the lock between pages SliceQueryPager pager = new SliceQueryPager(cfs, key, ColumnSlice.ALL_COLUMNS_ARRAY); - while (pager.hasNext()) { synchronized (cfs.table.indexLockFor(key.key)) @@ -548,10 +416,10 @@ public class Table ColumnFamily cf2 = cf.cloneMeShallow(); for (IColumn column : cf) { - if (cfs.indexManager.indexes(column.name(), idxNames)) + if (cfs.indexManager.indexes(column.name(), indexes)) cf2.addColumn(column); } - cfs.indexManager.applyIndexUpdates(key.key, cf2, cf2.getColumnNames(), null); + cfs.indexManager.indexRow(key.key, cf2); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java index 0a32b1c..6aa19f0 100644 --- a/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ThreadSafeSortedColumns.java @@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import com.google.common.base.Function; import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.utils.Allocator; @@ -125,11 +126,11 @@ public class ThreadSafeSortedColumns extends AbstractThreadUnsafeSortedColumns i */ public void addAll(ISortedColumns cm, Allocator allocator, Function transformation) { - addAllWithSizeDelta(cm, allocator, transformation); + addAllWithSizeDelta(cm, allocator, transformation, null); } @Override - public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function transformation) + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function transformation, SecondaryIndexManager.Updater indexer) { delete(cm.getDeletionInfo()); long sizeDelta = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java index 96e1e5d..c4ec52f 100644 --- a/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/TreeMapBackedSortedColumns.java @@ -27,6 +27,7 @@ import java.util.TreeMap; import com.google.common.base.Function; import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.utils.Allocator; @@ -82,11 +83,16 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn return false; } + public void addColumn(IColumn column, Allocator allocator) + { + addColumn(column, allocator, SecondaryIndexManager.nullUpdater); + } + /* * If we find an old column that has the same name * the ask it to resolve itself else add the new column */ - public void addColumn(IColumn column, Allocator allocator) + public long addColumn(IColumn column, Allocator allocator, SecondaryIndexManager.Updater indexer) { ByteBuffer name = column.name(); // this is a slightly unusual way to structure this; a more natural way is shown in ThreadSafeSortedColumns, @@ -94,34 +100,51 @@ public class TreeMapBackedSortedColumns extends AbstractThreadUnsafeSortedColumn // which saves the extra "get" in the no-conflict case [for both normal and super columns], // in exchange for a re-put in the SuperColumn case. IColumn oldColumn = map.put(name, column); - if (oldColumn != null) + if (oldColumn == null) + return column.dataSize(); + + if (oldColumn instanceof SuperColumn) + { + assert column instanceof SuperColumn; + long previousSize = oldColumn.dataSize(); + // since oldColumn is where we've been accumulating results, it's usually going to be faster to + // add the new one to the old, then place old back in the Map, rather than copy the old contents + // into the new Map entry. + ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator); + map.put(name, oldColumn); + return oldColumn.dataSize() - previousSize; + } + else { - if (oldColumn instanceof SuperColumn) - { - assert column instanceof SuperColumn; - // since oldColumn is where we've been accumulating results, it's usually going to be faster to - // add the new one to the old, then place old back in the Map, rather than copy the old contents - // into the new Map entry. - ((SuperColumn) oldColumn).putColumn((SuperColumn)column, allocator); - map.put(name, oldColumn); - } + // calculate reconciled col from old (existing) col and new col + IColumn reconciledColumn = column.reconcile(oldColumn, allocator); + map.put(name, reconciledColumn); + // for memtable updates we only care about oldcolumn, reconciledcolumn, but when compacting + // we need to make sure we update indexes no matter the order we merge + if (reconciledColumn == column) + indexer.update(oldColumn, reconciledColumn); else - { - // calculate reconciled col from old (existing) col and new col - IColumn reconciledColumn = column.reconcile(oldColumn, allocator); - map.put(name, reconciledColumn); - } + indexer.update(column, reconciledColumn); + return reconciledColumn.dataSize() - oldColumn.dataSize(); } } + public long addAllWithSizeDelta(ISortedColumns cm, Allocator allocator, Function transformation, SecondaryIndexManager.Updater indexer) + { + delete(cm.getDeletionInfo()); + for (IColumn column : cm.getSortedColumns()) + addColumn(transformation.apply(column), allocator, indexer); + + // we don't use this for memtables, so we don't bother computing size + return Long.MIN_VALUE; + } + /** * We need to go through each column in the column container and resolve it before adding */ public void addAll(ISortedColumns cm, Allocator allocator, Function transformation) { - delete(cm.getDeletionInfo()); - for (IColumn column : cm.getSortedColumns()) - addColumn(transformation.apply(column), allocator); + addAllWithSizeDelta(cm, allocator, transformation, SecondaryIndexManager.nullUpdater); } public boolean replace(IColumn oldColumn, IColumn newColumn) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 2c1f02a..e7b99c4 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -23,6 +23,7 @@ import java.security.MessageDigest; import java.util.Iterator; import java.util.List; +import com.google.common.base.Functions; import com.google.common.base.Predicates; import com.google.common.collect.Iterators; import org.slf4j.Logger; @@ -31,10 +32,12 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.ICountableColumnIterator; import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.ColumnStats; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.utils.HeapAllocator; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.StreamingHistogram; @@ -63,6 +66,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable private boolean closed; private ColumnIndex.Builder indexBuilder; private ColumnIndex columnsIndex; + private final SecondaryIndexManager.Updater indexer; public LazilyCompactedRow(CompactionController controller, List rows) { @@ -70,6 +74,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable this.rows = rows; this.controller = controller; this.shouldPurge = controller.shouldPurge(key); + indexer = controller.cfs.indexManager.updaterFor(key, false); for (OnDiskAtomIterator row : rows) { @@ -224,8 +229,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable private class Reducer extends MergeIterator.Reducer { + // all columns reduced together will have the same name, so there will only be one column + // in the container; we just want to leverage the conflict resolution code from CF ColumnFamily container = emptyColumnFamily.cloneMeShallow(); + + // tombstone reference; will be reconciled w/ column during getReduced RangeTombstone tombstone; + long serializedSize = 4; // int for column count int columns = 0; long maxTimestampSeen = Long.MIN_VALUE; @@ -234,9 +244,16 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable public void reduce(OnDiskAtom current) { if (current instanceof RangeTombstone) + { tombstone = (RangeTombstone)current; + } else - container.addColumn((IColumn)current); + { + IColumn column = (IColumn) current; + container.addColumn(column); + if (container.getColumn(column.name()) != column) + indexer.remove(column); + } } protected OnDiskAtom getReduced() http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java index f696ad4..56fce20 100644 --- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java +++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.*; import java.util.concurrent.locks.Condition; +import com.google.common.base.Functions; import com.google.common.collect.AbstractIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +35,7 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.columniterator.ICountableColumnIterator; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.utils.*; @@ -215,7 +217,8 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable else { // addAll is ok even if cf is an ArrayBackedSortedColumns - cf.addAll(thisCF, HeapAllocator.instance); + SecondaryIndexManager.Updater indexer = controller.cfs.indexManager.updaterFor(row.key, false); + cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.identity(), indexer); } } @@ -308,7 +311,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable else { logger.debug("parallel eager deserialize from " + iter.getPath()); - queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns()))); + queue.put(new RowContainer(new Row(iter.getKey(), iter.getColumnFamilyWithColumns(TreeMapBackedSortedColumns.factory())))); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java index ab52405..7d2c65f 100644 --- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java @@ -22,7 +22,10 @@ import java.io.IOException; import java.security.MessageDigest; import java.util.List; +import com.google.common.base.Functions; + import org.apache.cassandra.db.*; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.sstable.ColumnStats; import org.apache.cassandra.io.sstable.SSTableIdentityIterator; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -43,6 +46,8 @@ public class PrecompactedRow extends AbstractCompactedRow compactedCf = cf; } + + public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, CompactionController controller, ColumnFamily cf) { assert key != null; @@ -77,7 +82,9 @@ public class PrecompactedRow extends AbstractCompactedRow public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf) { // See comment in preceding method - ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, shouldPurge ? controller.gcBefore : Integer.MIN_VALUE); + ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, + shouldPurge ? controller.gcBefore : Integer.MIN_VALUE, + controller.cfs.indexManager.updaterFor(key, false)); if (shouldPurge && compacted != null && compacted.metadata().getDefaultValidator().isCommutative()) CounterColumn.mergeAndRemoveOldShards(key, compacted, controller.gcBefore, controller.mergeShardBefore); return compacted; @@ -86,19 +93,22 @@ public class PrecompactedRow extends AbstractCompactedRow public PrecompactedRow(CompactionController controller, List rows) { this(rows.get(0).getKey(), - removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows))); + removeDeletedAndOldShards(rows.get(0).getKey(), controller, merge(rows, controller))); } - private static ColumnFamily merge(List rows) + private static ColumnFamily merge(List rows, CompactionController controller) { assert !rows.isEmpty(); ColumnFamily cf = null; + SecondaryIndexManager.Updater indexer = null; for (SSTableIdentityIterator row : rows) { ColumnFamily thisCF; try { - thisCF = row.getColumnFamilyWithColumns(); + // use a map for the first once since that will be the one we merge into + ISortedColumns.Factory factory = cf == null ? TreeMapBackedSortedColumns.factory() : ArrayBackedSortedColumns.factory(); + thisCF = row.getColumnFamilyWithColumns(factory); } catch (IOException e) { @@ -108,11 +118,12 @@ public class PrecompactedRow extends AbstractCompactedRow if (cf == null) { cf = thisCF; + indexer = controller.cfs.indexManager.updaterFor(row.getKey(), false); // only init indexer once } else { // addAll is ok even if cf is an ArrayBackedSortedColumns - cf.addAll(thisCF, HeapAllocator.instance); + cf.addAllWithSizeDelta(thisCF, HeapAllocator.instance, Functions.identity(), indexer); } } return cf; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java index 24f09ab..3a012f6 100644 --- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java @@ -17,22 +17,14 @@ */ package org.apache.cassandra.db.index; -import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; import java.util.concurrent.ExecutionException; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.index.PerColumnSecondaryIndex; -import org.apache.cassandra.db.index.SecondaryIndexSearcher; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.*; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; /** @@ -81,21 +73,23 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec protected abstract ByteBuffer makeIndexColumnName(ByteBuffer rowKey, IColumn column); - public void deleteColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column) + public void delete(ByteBuffer rowKey, IColumn column) { if (column.isMarkedForDelete()) return; + DecoratedKey valueKey = getIndexKeyFor(column.value()); int localDeletionTime = (int) (System.currentTimeMillis() / 1000); ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata); cfi.addTombstone(makeIndexColumnName(rowKey, column), localDeletionTime, column.timestamp()); - indexCfs.apply(valueKey, cfi); + indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater); if (logger.isDebugEnabled()) logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi); } - public void insertColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn column) + public void insert(ByteBuffer rowKey, IColumn column) { + DecoratedKey valueKey = getIndexKeyFor(column.value()); ColumnFamily cfi = ColumnFamily.create(indexCfs.metadata); ByteBuffer name = makeIndexColumnName(rowKey, column); if (column instanceof ExpiringColumn) @@ -110,12 +104,12 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec if (logger.isDebugEnabled()) logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.key), cfi); - indexCfs.apply(valueKey, cfi); + indexCfs.apply(valueKey, cfi, SecondaryIndexManager.nullUpdater); } - public void updateColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col) + public void update(ByteBuffer rowKey, IColumn col) { - insertColumn(valueKey, rowKey, col); + insert(rowKey, col); } public void removeIndex(ByteBuffer columnName) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java index 2d8286e..01c6cd7 100644 --- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java @@ -33,29 +33,26 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex /** * Delete a column from the index * - * @param valueKey the column value which is used as the index key * @param rowKey the underlying row key which is indexed * @param col all the column info */ - public abstract void deleteColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col); + public abstract void delete(ByteBuffer rowKey, IColumn col); /** * insert a column to the index * - * @param valueKey the column value which is used as the index key * @param rowKey the underlying row key which is indexed * @param col all the column info */ - public abstract void insertColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col); + public abstract void insert(ByteBuffer rowKey, IColumn col); /** * update a column from the index * - * @param valueKey the column value which is used as the index key * @param rowKey the underlying row key which is indexed * @param col all the column info */ - public abstract void updateColumn(DecoratedKey valueKey, ByteBuffer rowKey, IColumn col); + public abstract void update(ByteBuffer rowKey, IColumn col); public String getNameForSystemTable(ByteBuffer column) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java index f2a0f60..96252c0 100644 --- a/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/PerRowSecondaryIndex.java @@ -34,27 +34,26 @@ import org.apache.cassandra.utils.ByteBufferUtil; public abstract class PerRowSecondaryIndex extends SecondaryIndex { /** - * Removes obsolete index entries and creates new ones for the given row key - * and mutated columns. + * Index the given row for new index creation. @param cf will represent the entire row. * * @param rowKey the row key * @param cf the current rows data - * @param mutatedIndexedColumns the set of columns that were changed or added - * @param oldIndexedColumns the columns which were deleted */ - public abstract void applyIndexUpdates(ByteBuffer rowKey, - ColumnFamily cf, - SortedSet mutatedIndexedColumns, - ColumnFamily oldIndexedColumns); + public abstract void index(ByteBuffer rowKey, ColumnFamily cf); + + /** + * Index the given row + * + * @param rowKey the row key + */ + public abstract void index(ByteBuffer rowKey); /** * cleans up deleted columns from cassandra cleanup compaction * * @param key - * @param indexedColumnsInRow */ - public abstract void deleteFromIndex(DecoratedKey key, List indexedColumnsInRow); - + public abstract void delete(DecoratedKey key); @Override public String getNameForSystemTable(ByteBuffer columnName) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index 7c87941..70a9164 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -34,12 +34,10 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.filter.IFilter; import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.dht.LocalToken; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.thrift.Column; import org.apache.cassandra.thrift.IndexExpression; -import org.apache.cassandra.utils.ByteBufferUtil; /** * Manages all the indexes associated with a given CFS @@ -49,6 +47,15 @@ public class SecondaryIndexManager { private static final Logger logger = LoggerFactory.getLogger(SecondaryIndexManager.class); + public static final Updater nullUpdater = new Updater() + { + public void insert(IColumn column) { } + + public void update(IColumn oldColumn, IColumn column) { } + + public void remove(IColumn current) { } + }; + /** * Organizes the indexes by column name */ @@ -119,7 +126,7 @@ public class SecondaryIndexManager * Caller must acquire and release references to the sstables used here. * * @param sstables the data to build from - * @param columns the list of columns to index, ordered by comparator + * @param idxNames the list of columns to index, ordered by comparator */ public void maybeBuildSecondaryIndexes(Collection sstables, Set idxNames) { @@ -149,14 +156,19 @@ public class SecondaryIndexManager logger.info("Index build of " + idxNames + " complete"); } - public boolean indexes(ByteBuffer name, Set idxNames) + public boolean indexes(ByteBuffer name, Collection indexes) { - for (SecondaryIndex index : getIndexesByNames(idxNames)) + return indexFor(name, indexes) != null; + } + + public SecondaryIndex indexFor(ByteBuffer name, Collection indexes) + { + for (SecondaryIndex index : indexes) { if (index.indexes(name)) - return true; + return index; } - return false; + return null; } public boolean indexes(IColumn column) @@ -166,7 +178,12 @@ public class SecondaryIndexManager public boolean indexes(ByteBuffer name) { - return indexes(name, allIndexesNames()); + return indexes(name, indexesByColumn.values()); + } + + public SecondaryIndex indexFor(ByteBuffer name) + { + return indexFor(name, indexesByColumn.values()); } /** @@ -394,128 +411,83 @@ public class SecondaryIndexManager } /** - * Removes obsolete index entries and creates new ones for the given row key - * and mutated columns. - * - * For columns whos underlying index type has the isRowLevelIndex() flag set to true this function will - * call the + * When building an index against existing data, add the given row to the index * - * @param rowKey the row key + * @param key the row key * @param cf the current rows data - * @param mutatedIndexedColumns the set of columns that were changed or added - * @param oldIndexedColumns the columns what were deleted */ - public void applyIndexUpdates(ByteBuffer rowKey, - ColumnFamily cf, - SortedSet mutatedIndexedColumns, - ColumnFamily oldIndexedColumns) + public void indexRow(ByteBuffer key, ColumnFamily cf) { - - // Identify the columns with PerRowSecondaryIndexes - // we need to make sure this is only called once + // Update entire row only once per row level index Set> appliedRowLevelIndexes = null; - // remove the old index entries - if (oldIndexedColumns != null) - { - for (IColumn column : oldIndexedColumns) - { - //this was previously deleted so should not be in index - if (column.isMarkedForDelete()) - continue; - - SecondaryIndex index = getIndexForFullColumnName(column.name()); - if (index == null) - { - logger.debug("Looks like index got dropped mid-update. Skipping"); - continue; - } - - // Update entire row if we encounter a row level index - if (index instanceof PerRowSecondaryIndex) - { - if (appliedRowLevelIndexes == null) - appliedRowLevelIndexes = new HashSet>(); - - if (appliedRowLevelIndexes.add(index.getClass())) - ((PerRowSecondaryIndex)index).applyIndexUpdates(rowKey, cf, mutatedIndexedColumns, oldIndexedColumns); - } - else - { - DecoratedKey valueKey = index.getIndexKeyFor(column.value()); - ((PerColumnSecondaryIndex)index).deleteColumn(valueKey, rowKey, column); - } - } - } - - //insert new columns - for (ByteBuffer columnName : mutatedIndexedColumns) + for (Map.Entry entry : indexesByColumn.entrySet()) { - IColumn column = cf.getColumn(columnName); - if (column == null || column.isMarkedForDelete()) - continue; // null column == row deletion - - SecondaryIndex index = getIndexForFullColumnName(columnName); - if (index == null) - { - logger.debug("index on {} removed; skipping remove-old for {}", columnName, ByteBufferUtil.bytesToHex(rowKey)); - continue; - } + SecondaryIndex index = entry.getValue(); - // Update entire row if we encounter a row level index if (index instanceof PerRowSecondaryIndex) { if (appliedRowLevelIndexes == null) appliedRowLevelIndexes = new HashSet>(); if (appliedRowLevelIndexes.add(index.getClass())) - ((PerRowSecondaryIndex)index).applyIndexUpdates(rowKey, cf, mutatedIndexedColumns, oldIndexedColumns); + ((PerRowSecondaryIndex)index).index(key, cf); } else { - DecoratedKey valueKey = index.getIndexKeyFor(column.value()); + IColumn column = cf.getColumn(entry.getKey()); + if (column == null) + continue; - ((PerColumnSecondaryIndex)index).insertColumn(valueKey, rowKey, column); + ((PerColumnSecondaryIndex) index).insert(key, column); } } } /** - * Delete all columns from all indexes for this row + * Delete all columns from all indexes for this row. For when cleanup rips a row out entirely. + * * @param key the row key * @param indexedColumnsInRow all column names in row */ public void deleteFromIndexes(DecoratedKey key, List indexedColumnsInRow) { - - // Identify the columns with isRowLevelIndex == true - // we need to make sure this is only called once + // Update entire row only once per row level index Set> cleanedRowLevelIndexes = null; for (IColumn column : indexedColumnsInRow) { SecondaryIndex index = indexesByColumn.get(column.name()); - if (index == null) continue; - //Update entire row if we encounter a row level index if (index instanceof PerRowSecondaryIndex) { if (cleanedRowLevelIndexes == null) cleanedRowLevelIndexes = new HashSet>(); if (cleanedRowLevelIndexes.add(index.getClass())) - ((PerRowSecondaryIndex)index).deleteFromIndex(key, indexedColumnsInRow); + ((PerRowSecondaryIndex)index).delete(key); } else { - DecoratedKey valueKey = index.getIndexKeyFor(column.value()); - ((PerColumnSecondaryIndex) index).deleteColumn(valueKey, key.key, column); + ((PerColumnSecondaryIndex) index).delete(key.key, column); } } } + /** + * This helper acts as a closure around the indexManager + * and row key to ensure that down in Memtable's ColumnFamily implementation, the index + * can get updated. Note: only a CF backed by AtomicSortedColumns implements this behaviour + * fully, other types simply ignore the index updater. + */ + public Updater updaterFor(final DecoratedKey key, boolean includeRowIndexes) + { + return (includeRowIndexes && !rowLevelIndexMap.isEmpty()) + ? new MixedIndexUpdater(key) + : indexesByColumn.isEmpty() ? nullUpdater : new PerColumnIndexUpdater(key); + } /** * Get a list of IndexSearchers from the union of expression index types @@ -578,7 +550,7 @@ public class SecondaryIndexManager return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, maxIsColumns); } - private Collection getIndexesByNames(Set idxNames) + public Collection getIndexesByNames(Set idxNames) { List result = new ArrayList(); for (SecondaryIndex index : indexesByColumn.values()) @@ -606,4 +578,132 @@ public class SecondaryIndexManager SecondaryIndex index = getIndexForColumn(column.name); return index != null ? index.validate(column) : true; } + + public static interface Updater + { + public void insert(IColumn column); + + public void update(IColumn oldColumn, IColumn column); + + public void remove(IColumn current); + } + + private class PerColumnIndexUpdater implements Updater + { + private final DecoratedKey key; + + public PerColumnIndexUpdater(DecoratedKey key) + { + this.key = key; + } + + public void insert(IColumn column) + { + if (column.isMarkedForDelete()) + return; + + SecondaryIndex index = indexFor(column.name()); + if (index == null) + return; + + ((PerColumnSecondaryIndex) index).insert(key.key, column); + } + + public void update(IColumn oldColumn, IColumn column) + { + if (column.isMarkedForDelete()) + return; + + SecondaryIndex index = indexFor(column.name()); + if (index == null) + return; + + ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn); + ((PerColumnSecondaryIndex) index).insert(key.key, column); + } + + public void remove(IColumn column) + { + if (column.isMarkedForDelete()) + return; + + SecondaryIndex index = indexFor(column.name()); + if (index == null) + return; + + ((PerColumnSecondaryIndex) index).delete(key.key, column); + } + } + + private class MixedIndexUpdater implements Updater + { + private final DecoratedKey key; + Set> appliedRowLevelIndexes = new HashSet>(); + + public MixedIndexUpdater(DecoratedKey key) + { + this.key = key; + } + + public void insert(IColumn column) + { + if (column.isMarkedForDelete()) + return; + + SecondaryIndex index = indexFor(column.name()); + if (index == null) + return; + + if (index instanceof PerColumnSecondaryIndex) + { + ((PerColumnSecondaryIndex) index).insert(key.key, column); + } + else + { + if (appliedRowLevelIndexes.add(index.getClass())) + ((PerRowSecondaryIndex) index).index(key.key); + } + } + + public void update(IColumn oldColumn, IColumn column) + { + if (column.isMarkedForDelete()) + return; + + SecondaryIndex index = indexFor(column.name()); + if (index == null) + return; + + if (index instanceof PerColumnSecondaryIndex) + { + ((PerColumnSecondaryIndex) index).delete(key.key, oldColumn); + ((PerColumnSecondaryIndex) index).insert(key.key, column); + } + else + { + if (appliedRowLevelIndexes.add(index.getClass())) + ((PerRowSecondaryIndex) index).index(key.key); + } + } + + public void remove(IColumn column) + { + if (column.isMarkedForDelete()) + return; + + SecondaryIndex index = indexFor(column.name()); + if (index == null) + return; + + if (index instanceof PerColumnSecondaryIndex) + { + ((PerColumnSecondaryIndex) index).delete(key.key, column); + } + else + { + if (appliedRowLevelIndexes.add(index.getClass())) + ((PerRowSecondaryIndex) index).index(key.key); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java index bbd1c83..1938773 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java @@ -44,4 +44,14 @@ public abstract class SecondaryIndexSearcher * @return true this index is able to handle given clauses. */ public abstract boolean isIndexing(List clause); + + protected boolean isIndexValueStale(ColumnFamily liveData, ByteBuffer indexedColumnName, ByteBuffer indexedValue) + { + IColumn liveColumn = liveData.getColumn(indexedColumnName); + if (null == liveColumn || liveColumn.isMarkedForDelete()) + return true; + + ByteBuffer liveValue = liveColumn.value(); + return 0 != liveData.metadata().getValueValidator(indexedColumnName).compare(indexedValue, liveValue); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java index 6ffb63e..9b37ae3 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java @@ -23,6 +23,7 @@ import java.util.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.index.PerColumnSecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.index.SecondaryIndexSearcher; @@ -300,6 +301,17 @@ public class CompositesSearcher extends SecondaryIndexSearcher ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, path, dataFilter)); if (newData != null) { + ByteBuffer baseColumnName = builder.copy().add(primary.column_name).build(); + ByteBuffer indexedValue = indexKey.key; + + if (isIndexValueStale(newData, baseColumnName, indexedValue)) + { + // delete the index entry w/ its own timestamp + IColumn dummyColumn = new Column(baseColumnName, indexedValue, column.timestamp()); + ((PerColumnSecondaryIndex) index).delete(dk.key, dummyColumn); + continue; + } + if (!filter.isSatisfiedBy(newData, builder)) continue; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index f18e041..4bd2b39 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -23,6 +23,7 @@ import java.util.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.index.PerColumnSecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.index.SecondaryIndexSearcher; @@ -31,6 +32,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.IndexOperator; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.HeapAllocator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -196,6 +198,24 @@ public class KeysSearcher extends SecondaryIndexSearcher // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null if (data == null) data = ColumnFamily.create(baseCfs.metadata); + + // as in CFS.filter - extend the filter to ensure we include the columns + // from the index expressions, just in case they weren't included in the initialFilter + IFilter extraFilter = filter.getExtraFilter(data); + if (extraFilter != null) + { + ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, path, extraFilter)); + if (cf != null) + data.addAll(cf, HeapAllocator.instance); + } + + if (isIndexValueStale(data, primary.column_name, indexKey.key)) + { + // delete the index entry w/ its own timestamp + IColumn dummyColumn = new Column(primary.column_name, indexKey.key, column.timestamp()); + ((PerColumnSecondaryIndex)index).delete(dk.key, dummyColumn); + continue; + } return new Row(dk, data); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index 4710689..d9aabfb 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -227,10 +227,10 @@ public class SSTableIdentityIterator implements Comparable idxOpts = Collections.singletonMap(CompositesIndex.PREFIX_SIZE_OPTION, "1"); + final CompositeType composite = CompositeType.getInstance(Arrays.asList(new AbstractType[]{UTF8Type.instance, UTF8Type.instance})); + return new CFMetaData(ksName, + cfName, + ColumnFamilyType.Standard, + composite, + null) + .columnMetadata(new HashMap() + {{ + ByteBuffer cName = ByteBuffer.wrap("col1".getBytes(Charsets.UTF_8)); + IndexType idxType = withIdxType ? IndexType.COMPOSITES : null; + put(cName, new ColumnDefinition(cName, UTF8Type.instance, idxType, idxOpts, withIdxType ? "col1_idx" : null, 1)); + }}); + } + private static CFMetaData jdbcCFMD(String ksName, String cfName, AbstractType comp, boolean withOldCfIds) { CFMetaData cfmd = new CFMetaData(ksName, cfName, ColumnFamilyType.Standard, comp, null).defaultValidator(comp); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a1b93d7/test/unit/org/apache/cassandra/config/CFMetaDataTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java index 4a93775..8ca83ce 100644 --- a/test/unit/org/apache/cassandra/config/CFMetaDataTest.java +++ b/test/unit/org/apache/cassandra/config/CFMetaDataTest.java @@ -18,9 +18,11 @@ */ package org.apache.cassandra.config; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.HashMap; +import java.util.Map; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.cql3.QueryProcessor; @@ -123,6 +125,18 @@ public class CFMetaDataTest extends SchemaLoader { DecoratedKey k = StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(cfm.ksName)); + // This is a nasty hack to work around the fact that non-null componentIndex + // are only used by CQL (so far) so we don't expose them through thrift + // There is a CFM with componentIndex defined in Keyspace2 which is used by + // ColumnFamilyStoreTest to verify index repair (CASSANDRA-2897) + for (Map.Entry cMeta: cfm.column_metadata.entrySet()) + { + // Non-null componentIndex are only used by CQL (so far) so we don't expose + // them through thrift + if (cMeta.getValue().componentIndex != null) + cfm.column_metadata.remove(cMeta.getKey()); + } + // Test thrift conversion assert cfm.equals(CFMetaData.fromThrift(cfm.toThrift())) : String.format("\n%s\n!=\n%s", cfm, CFMetaData.fromThrift(cfm.toThrift()));